Skip to content

Commit

Permalink
Fix: some rss changes (#14)
Browse files Browse the repository at this point in the history
* chore(actions): removed useless actions from repository

* chore(ci): added gitlab ci config file

* Merge: Checked local changes

* Update README.md

* Merge: merge local changes

* fix(compose): docker compose fixes

* Fix: fixed docker compose config

* Merged: merged local changes

* Improve: moved topics config to db (#11)

* chore(rmq): replaced direct to fanout exchange kind

* Improve: Merge local changes

* chore(git): updated .gitignore file

* fix(rss): return current datetime on parsing rss item error

* fix(rss): return err from spawned thread if rss error

* chore(tracing): updated tracing msg formatting

* fix(datetime): added dateparser crate

* merge(conflict): resolved conflicts

---------

Co-authored-by: Bread White <breadrock1@email.net>
  • Loading branch information
breadrock1 and Bread White authored Nov 28, 2024
1 parent e2a47da commit 9a4b91f
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 30 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.run
.vscode

/rabbitmq-data
Expand Down
41 changes: 41 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# This file is a template, and might need editing before it works on your project.
# This is a sample GitLab CI/CD configuration file that should run without any modifications.
# It demonstrates a basic 3 stage CI/CD pipeline. Instead of real tests or scripts,
# it uses echo commands to simulate the pipeline execution.
#
# A pipeline is composed of independent jobs that run scripts, grouped into stages.
# Stages run in sequential order, but jobs within stages run in parallel.
#
# For more information, see: https://docs.gitlab.com/ee/ci/yaml/index.html#stages
#
# You can copy and paste this template into a new `.gitlab-ci.yml` file.
# You should not add this template to an existing `.gitlab-ci.yml` file by using the `include:` keyword.
#
# To contribute improvements to CI/CD templates, please follow the Development guide at:
# https://docs.gitlab.com/ee/development/cicd/templates.html
# This specific template is located at:
# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Getting-Started.gitlab-ci.yml

stages: # List of stages for jobs, and their order of execution
- build
- test

build-job: # This job runs in the build stage, which runs first.
stage: build
script:
- cargo build --verbose

fmt-job:
stage: test
script:
- cargo fmt --all --verbose --check

clippy-job:
stage: test
script:
- cargo clippy --all --all-features

unit-test-job: # This job runs in the test stage.
stage: test # It only starts when the job in the build stage completes successfully.
script:
- cargo test --all --verbose
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test-publish-rabbit = []
anyhow = "^1.0"
async-trait = "^0.1"
config = "^0.14"
dateparser = "^0.2"
derive_builder = "^0.20"
getset = "^0.1"
lapin = "^2.5"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ with following pre-processing steps:
1. Clone the repository:

```shell
git clone https://github.com/breadrock1/news-rss.git
git clone http://<gitlab-domain-address>/data-lake/news-rss.git
cd news-rss
```

Expand Down
8 changes: 4 additions & 4 deletions config/production.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ password = "postgres"
max_pool_size = 10

[storage.pgsql]
address = "localhost:5432"
database = "ai-crawler"
username = "postgres"
password = "postgres"
address = "postgres:5432"
database = "agregator"
username = "agregator"
password = "agregator_password"
max_pool_size = 10

[crawler.llm]
Expand Down
6 changes: 3 additions & 3 deletions src/cache/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ impl CacheService for RedisClient {
let cxt = self.client.write().await;
match cxt.get_multiplexed_tokio_connection().await {
Err(err) => {
tracing::warn!("cache: failed to get redis connection {err:#?}");
tracing::warn!(err=?err, "cache: failed to get redis connection");
return;
}
Ok(mut conn) => {
let store_result: RedisResult<()> = conn.set_ex(key, value, expired_secs).await;
if let Err(err) = store_result {
tracing::warn!("cache: failed to store value to redis: {err:#?}");
tracing::warn!(err=?err, "cache: failed to store value to redis");
return;
}
}
Expand All @@ -58,7 +58,7 @@ impl CacheService for RedisClient {
match cxt.get_multiplexed_tokio_connection().await {
Ok(mut conn) => conn.get::<&str, PublishNews>(key).await.ok().is_some(),
Err(err) => {
tracing::warn!("failed to get redis service connection {err:#?}");
tracing::warn!(err=?err, "failed to get redis service connection");
false
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/cache/redis/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl redis::ToRedisArgs for PublishNews {
match serde_json::to_string(self) {
Ok(json_str) => out.write_arg_fmt(json_str),
Err(err) => {
tracing::error!("cacher: failed to serialize search parameters: {err:#?}");
tracing::error!(err=?err, "cacher: failed to serialize search parameters");
}
}
}
Expand All @@ -26,7 +26,7 @@ impl redis::FromRedisValue for PublishNews {
serde_json::from_slice::<PublishNews>(data.as_slice()).map_err(RedisError::from)
}
_ => {
let err = serde_json::Error::custom("failed to extract redis value type");
let err = serde_json::Error::custom("failed to extract redis value");
Err(RedisError::from(err))
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/crawler/llm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl CrawlerService for LlmCrawler {
match retriever::extract_semantic_blocks(&content) {
Ok(extracted) => Ok(extracted),
Err(err) => {
tracing::error!("failed to extract semantic blocks from llm: {err:#?}");
tracing::error!(err=?err, "failed to extract semantic blocks from llm");
Ok(content)
}
}
Expand All @@ -78,14 +78,14 @@ impl CrawlerService for LlmCrawler {
.await?
.error_for_status()
.map_err(|err| {
tracing::error!(err=?err, "failed to send request to url: {url}");
tracing::error!(err=?err, url=url, "failed to send request to url");
err
})?;

let html_str = response.text().await?;
let html_str = match html_editor::parse(&html_str) {
Err(err) => {
tracing::error!("failed to parse html: {err}");
tracing::error!(err = err, "failed to parse html");
html_str
}
Ok(mut dom) => dom
Expand Down
2 changes: 1 addition & 1 deletion src/crawler/llm/retriever.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn extract_json_semantic_blocks(text_data: &str) -> Result<String, anyhow::E
.filter_map(|it| match extract_json_object(it.as_str()) {
Ok(data) => Some(data),
Err(err) => {
tracing::warn!("failed while extracting json object: {err:#?}");
tracing::warn!(err=?err, "failed while extracting json object");
None
}
})
Expand Down
32 changes: 22 additions & 10 deletions src/feeds/rss_feeds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::feeds::FetchTopic;
use crate::publish::models::PublishNews;
use crate::publish::Publisher;

use chrono::NaiveDateTime;
use chrono::Utc;
use getset::{CopyGetters, Getters};
use regex::Regex;
use reqwest::Url;
Expand Down Expand Up @@ -79,13 +79,13 @@ where
Ok(channel) => {
let topic = &channel.title().to_string();
if let Err(err) = self.processing_event(channel).await {
tracing::error!("{topic}: failed while processing rss event: {err:#?}");
tracing::error!(err=?err, topic=topic, "failed while processing rss event");
continue;
};
}
Err(err) => {
tracing::error!("failed to fetch rss channel: {err:#?}");
continue;
tracing::error!(err=?err, "failed to fetch rss channel");
return Err(err.into());
}
}
}
Expand Down Expand Up @@ -114,32 +114,38 @@ where

pub async fn processing_event(&self, channel: rss::Channel) -> Result<(), anyhow::Error> {
let topic = channel.title();
tracing::info!("{topic}: received new content from {topic}");
tracing::info!("{topic}: received new content");

for item in channel.items() {
let response = match self.extract_item(item).await {
Ok(it) => it,
Err(err) => {
tracing::error!("{topic}: failed while converting rss item: {err:#?}");
tracing::error!(err=?err, "{topic}: failed while converting rss item");
continue;
}
};

let art_id = response.guid();
if self.cacher().contains(art_id).await {
tracing::warn!("news article {art_id} has been already parsed");
tracing::warn!(
article = art_id,
"{topic}: news article has been already parsed"
);
continue;
}

let art = PublishNews::from(response);
let art_id = art.id();
let publish = self.publisher();
if let Err(err) = publish.publish(&art).await {
tracing::error!("{topic}: failed to send news article {art_id}: {err:#?}");
tracing::error!(err=?err, article=art_id, "{topic}: failed to send article");
continue;
}

tracing::info!("{topic}: article {art_id} published successful");
tracing::info!(
article = art_id,
"{topic}: article has been published successful"
);
self.cacher.set(art_id, &art).await;
}

Expand Down Expand Up @@ -174,7 +180,13 @@ where

let pub_date = item
.pub_date()
.map(|it| NaiveDateTime::from_str(it).unwrap_or_default())
.map(|it| match dateparser::DateTimeUtc::from_str(it) {
Ok(time) => time.0.naive_utc(),
Err(err) => {
tracing::warn!(err=?err, time=it, "failed to extract datetime");
Utc::now().naive_utc()
}
})
.unwrap_or_default();

let photo_path = match item.itunes_ext() {
Expand Down
2 changes: 1 addition & 1 deletion src/publish/pgsql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl ServiceConnect for PgsqlPublisher {
let address = config.address();

let url = format!("postgresql://{user}:{passwd}@{address}/{db}");
tracing::info!("connecting to `{}` database", &url);
tracing::info!(db_url = url, "connecting to database");
let connection = PgPoolOptions::default()
.max_connections(config.max_pool_size())
.connect(&url)
Expand Down
2 changes: 1 addition & 1 deletion src/server/routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ where

let worker_name = form.target_url();
if let Some(worker) = workers_guard.get(worker_name) {
tracing::info!("worker {worker_name} already exists");
tracing::info!(worker = worker_name, "worker already exists");
if !worker.worker().is_finished() && !form.create_force() {
let msg = format!("worker {worker_name} already launched");
return Err(ServerError::Launched(msg));
Expand Down
2 changes: 1 addition & 1 deletion src/storage/pgsql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl ServiceConnect for PgsqlTopicStorage {
let address = config.address();

let url = format!("postgresql://{user}:{passwd}@{address}/{db}");
tracing::info!("connecting to `{}` database", &url);
tracing::info!(url = url, "connecting to database");
let connection = PgPoolOptions::default()
.max_connections(config.max_pool_size())
.connect(&url)
Expand Down
3 changes: 2 additions & 1 deletion tests/mocks/mock_rmq_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ impl Publisher for MockRabbitPublisher {
type Error = ();

async fn publish(&self, msg_body: &PublishNews) -> Result<(), Self::Error> {
tracing::info!("rabbit confirm message successful: {}", msg_body.id());
let id = msg_body.id();
tracing::info!(article = id, "rabbit confirm msg successful");
Ok(())
}
}
4 changes: 2 additions & 2 deletions tests/tests_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ pub async fn rabbit_consumer(queue: &str, config: &RabbitConfig) -> Result<(), a
consumer.set_delegate(move |delivery: DeliveryResult| async move {
let delivery = match delivery {
Ok(Some(delivery)) => {
tracing::info!("delivered {delivery:?}");
tracing::info!(delivery=?delivery, "msg has been delivered");
delivery
}
Ok(None) => return,
Err(err) => {
tracing::error!("failed to consume queue message {err:#?}");
tracing::error!(err=?err, "failed to consume queue msg");
return;
}
};
Expand Down

0 comments on commit 9a4b91f

Please sign in to comment.