diff --git a/.gitignore b/.gitignore index 4a20f5b..db8bc0b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.run .vscode /rabbitmq-data diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..a49a246 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,41 @@ +# This file is a template, and might need editing before it works on your project. +# This is a sample GitLab CI/CD configuration file that should run without any modifications. +# It demonstrates a basic 3 stage CI/CD pipeline. Instead of real tests or scripts, +# it uses echo commands to simulate the pipeline execution. +# +# A pipeline is composed of independent jobs that run scripts, grouped into stages. +# Stages run in sequential order, but jobs within stages run in parallel. +# +# For more information, see: https://docs.gitlab.com/ee/ci/yaml/index.html#stages +# +# You can copy and paste this template into a new `.gitlab-ci.yml` file. +# You should not add this template to an existing `.gitlab-ci.yml` file by using the `include:` keyword. +# +# To contribute improvements to CI/CD templates, please follow the Development guide at: +# https://docs.gitlab.com/ee/development/cicd/templates.html +# This specific template is located at: +# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Getting-Started.gitlab-ci.yml + +stages: # List of stages for jobs, and their order of execution + - build + - test + +build-job: # This job runs in the build stage, which runs first. + stage: build + script: + - cargo build --verbose + +fmt-job: + stage: test + script: + - cargo fmt --all --verbose --check + +clippy-job: + stage: test + script: + - cargo clippy --all --all-features + +unit-test-job: # This job runs in the test stage. + stage: test # It only starts when the job in the build stage completes successfully. + script: + - cargo test --all --verbose diff --git a/Cargo.lock b/Cargo.lock index 9077e7e..ab6d1d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -867,6 +867,18 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" +[[package]] +name = "dateparser" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2ef451feee09ae5ecd8a02e738bd9adee9266b8fa9b44e22d3ce968d8694238" +dependencies = [ + "anyhow", + "chrono", + "lazy_static", + "regex", +] + [[package]] name = "deadpool" version = "0.10.0" @@ -2171,6 +2183,7 @@ dependencies = [ "chrono", "config", "console-subscriber", + "dateparser", "derive_builder", "getset", "html2text", diff --git a/Cargo.toml b/Cargo.toml index 3346cdc..5669c0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ test-publish-rabbit = [] anyhow = "^1.0" async-trait = "^0.1" config = "^0.14" +dateparser = "^0.2" derive_builder = "^0.20" getset = "^0.1" lapin = "^2.5" diff --git a/README.md b/README.md index 8fe534c..d5a2375 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ with following pre-processing steps: 1. Clone the repository: ```shell - git clone https://github.com/breadrock1/news-rss.git + git clone http:///data-lake/news-rss.git cd news-rss ``` diff --git a/config/production.toml b/config/production.toml index 9be2691..c81916c 100644 --- a/config/production.toml +++ b/config/production.toml @@ -30,10 +30,10 @@ password = "postgres" max_pool_size = 10 [storage.pgsql] -address = "localhost:5432" -database = "ai-crawler" -username = "postgres" -password = "postgres" +address = "postgres:5432" +database = "agregator" +username = "agregator" +password = "agregator_password" max_pool_size = 10 [crawler.llm] diff --git a/src/cache/redis/mod.rs b/src/cache/redis/mod.rs index 41fca55..9943f09 100644 --- a/src/cache/redis/mod.rs +++ b/src/cache/redis/mod.rs @@ -40,13 +40,13 @@ impl CacheService for RedisClient { let cxt = self.client.write().await; match cxt.get_multiplexed_tokio_connection().await { Err(err) => { - tracing::warn!("cache: failed to get redis connection {err:#?}"); + tracing::warn!(err=?err, "cache: failed to get redis connection"); return; } Ok(mut conn) => { let store_result: RedisResult<()> = conn.set_ex(key, value, expired_secs).await; if let Err(err) = store_result { - tracing::warn!("cache: failed to store value to redis: {err:#?}"); + tracing::warn!(err=?err, "cache: failed to store value to redis"); return; } } @@ -58,7 +58,7 @@ impl CacheService for RedisClient { match cxt.get_multiplexed_tokio_connection().await { Ok(mut conn) => conn.get::<&str, PublishNews>(key).await.ok().is_some(), Err(err) => { - tracing::warn!("failed to get redis service connection {err:#?}"); + tracing::warn!(err=?err, "failed to get redis service connection"); false } } diff --git a/src/cache/redis/models.rs b/src/cache/redis/models.rs index d1241d0..6bd8070 100644 --- a/src/cache/redis/models.rs +++ b/src/cache/redis/models.rs @@ -13,7 +13,7 @@ impl redis::ToRedisArgs for PublishNews { match serde_json::to_string(self) { Ok(json_str) => out.write_arg_fmt(json_str), Err(err) => { - tracing::error!("cacher: failed to serialize search parameters: {err:#?}"); + tracing::error!(err=?err, "cacher: failed to serialize search parameters"); } } } @@ -26,7 +26,7 @@ impl redis::FromRedisValue for PublishNews { serde_json::from_slice::(data.as_slice()).map_err(RedisError::from) } _ => { - let err = serde_json::Error::custom("failed to extract redis value type"); + let err = serde_json::Error::custom("failed to extract redis value"); Err(RedisError::from(err)) } } diff --git a/src/crawler/llm/mod.rs b/src/crawler/llm/mod.rs index 6e62d2c..60d27e7 100644 --- a/src/crawler/llm/mod.rs +++ b/src/crawler/llm/mod.rs @@ -65,7 +65,7 @@ impl CrawlerService for LlmCrawler { match retriever::extract_semantic_blocks(&content) { Ok(extracted) => Ok(extracted), Err(err) => { - tracing::error!("failed to extract semantic blocks from llm: {err:#?}"); + tracing::error!(err=?err, "failed to extract semantic blocks from llm"); Ok(content) } } @@ -78,14 +78,14 @@ impl CrawlerService for LlmCrawler { .await? .error_for_status() .map_err(|err| { - tracing::error!(err=?err, "failed to send request to url: {url}"); + tracing::error!(err=?err, url=url, "failed to send request to url"); err })?; let html_str = response.text().await?; let html_str = match html_editor::parse(&html_str) { Err(err) => { - tracing::error!("failed to parse html: {err}"); + tracing::error!(err = err, "failed to parse html"); html_str } Ok(mut dom) => dom diff --git a/src/crawler/llm/retriever.rs b/src/crawler/llm/retriever.rs index e7014ed..9a4082c 100644 --- a/src/crawler/llm/retriever.rs +++ b/src/crawler/llm/retriever.rs @@ -48,7 +48,7 @@ pub fn extract_json_semantic_blocks(text_data: &str) -> Result Some(data), Err(err) => { - tracing::warn!("failed while extracting json object: {err:#?}"); + tracing::warn!(err=?err, "failed while extracting json object"); None } }) diff --git a/src/feeds/rss_feeds/mod.rs b/src/feeds/rss_feeds/mod.rs index d21a6f1..b948dde 100644 --- a/src/feeds/rss_feeds/mod.rs +++ b/src/feeds/rss_feeds/mod.rs @@ -11,7 +11,7 @@ use crate::feeds::FetchTopic; use crate::publish::models::PublishNews; use crate::publish::Publisher; -use chrono::NaiveDateTime; +use chrono::Utc; use getset::{CopyGetters, Getters}; use regex::Regex; use reqwest::Url; @@ -79,13 +79,13 @@ where Ok(channel) => { let topic = &channel.title().to_string(); if let Err(err) = self.processing_event(channel).await { - tracing::error!("{topic}: failed while processing rss event: {err:#?}"); + tracing::error!(err=?err, topic=topic, "failed while processing rss event"); continue; }; } Err(err) => { - tracing::error!("failed to fetch rss channel: {err:#?}"); - continue; + tracing::error!(err=?err, "failed to fetch rss channel"); + return Err(err.into()); } } } @@ -114,20 +114,23 @@ where pub async fn processing_event(&self, channel: rss::Channel) -> Result<(), anyhow::Error> { let topic = channel.title(); - tracing::info!("{topic}: received new content from {topic}"); + tracing::info!("{topic}: received new content"); for item in channel.items() { let response = match self.extract_item(item).await { Ok(it) => it, Err(err) => { - tracing::error!("{topic}: failed while converting rss item: {err:#?}"); + tracing::error!(err=?err, "{topic}: failed while converting rss item"); continue; } }; let art_id = response.guid(); if self.cacher().contains(art_id).await { - tracing::warn!("news article {art_id} has been already parsed"); + tracing::warn!( + article = art_id, + "{topic}: news article has been already parsed" + ); continue; } @@ -135,11 +138,14 @@ where let art_id = art.id(); let publish = self.publisher(); if let Err(err) = publish.publish(&art).await { - tracing::error!("{topic}: failed to send news article {art_id}: {err:#?}"); + tracing::error!(err=?err, article=art_id, "{topic}: failed to send article"); continue; } - tracing::info!("{topic}: article {art_id} published successful"); + tracing::info!( + article = art_id, + "{topic}: article has been published successful" + ); self.cacher.set(art_id, &art).await; } @@ -174,7 +180,13 @@ where let pub_date = item .pub_date() - .map(|it| NaiveDateTime::from_str(it).unwrap_or_default()) + .map(|it| match dateparser::DateTimeUtc::from_str(it) { + Ok(time) => time.0.naive_utc(), + Err(err) => { + tracing::warn!(err=?err, time=it, "failed to extract datetime"); + Utc::now().naive_utc() + } + }) .unwrap_or_default(); let photo_path = match item.itunes_ext() { diff --git a/src/publish/pgsql/mod.rs b/src/publish/pgsql/mod.rs index 52e8084..38a1ade 100644 --- a/src/publish/pgsql/mod.rs +++ b/src/publish/pgsql/mod.rs @@ -30,7 +30,7 @@ impl ServiceConnect for PgsqlPublisher { let address = config.address(); let url = format!("postgresql://{user}:{passwd}@{address}/{db}"); - tracing::info!("connecting to `{}` database", &url); + tracing::info!(db_url = url, "connecting to database"); let connection = PgPoolOptions::default() .max_connections(config.max_pool_size()) .connect(&url) diff --git a/src/server/routers.rs b/src/server/routers.rs index ef182e2..590ddbe 100644 --- a/src/server/routers.rs +++ b/src/server/routers.rs @@ -182,7 +182,7 @@ where let worker_name = form.target_url(); if let Some(worker) = workers_guard.get(worker_name) { - tracing::info!("worker {worker_name} already exists"); + tracing::info!(worker = worker_name, "worker already exists"); if !worker.worker().is_finished() && !form.create_force() { let msg = format!("worker {worker_name} already launched"); return Err(ServerError::Launched(msg)); diff --git a/src/storage/pgsql/mod.rs b/src/storage/pgsql/mod.rs index 791777f..665b016 100644 --- a/src/storage/pgsql/mod.rs +++ b/src/storage/pgsql/mod.rs @@ -29,7 +29,7 @@ impl ServiceConnect for PgsqlTopicStorage { let address = config.address(); let url = format!("postgresql://{user}:{passwd}@{address}/{db}"); - tracing::info!("connecting to `{}` database", &url); + tracing::info!(url = url, "connecting to database"); let connection = PgPoolOptions::default() .max_connections(config.max_pool_size()) .connect(&url) diff --git a/tests/mocks/mock_rmq_publish.rs b/tests/mocks/mock_rmq_publish.rs index fc0fe8e..afba1f0 100644 --- a/tests/mocks/mock_rmq_publish.rs +++ b/tests/mocks/mock_rmq_publish.rs @@ -22,7 +22,8 @@ impl Publisher for MockRabbitPublisher { type Error = (); async fn publish(&self, msg_body: &PublishNews) -> Result<(), Self::Error> { - tracing::info!("rabbit confirm message successful: {}", msg_body.id()); + let id = msg_body.id(); + tracing::info!(article = id, "rabbit confirm msg successful"); Ok(()) } } diff --git a/tests/tests_helper.rs b/tests/tests_helper.rs index 95f43c1..0215c1c 100644 --- a/tests/tests_helper.rs +++ b/tests/tests_helper.rs @@ -91,12 +91,12 @@ pub async fn rabbit_consumer(queue: &str, config: &RabbitConfig) -> Result<(), a consumer.set_delegate(move |delivery: DeliveryResult| async move { let delivery = match delivery { Ok(Some(delivery)) => { - tracing::info!("delivered {delivery:?}"); + tracing::info!(delivery=?delivery, "msg has been delivered"); delivery } Ok(None) => return, Err(err) => { - tracing::error!("failed to consume queue message {err:#?}"); + tracing::error!(err=?err, "failed to consume queue msg"); return; } };