Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: some rss changes #14

Merged
merged 49 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
ff7e7d0
chore(actions): removed useless actions from repository
Nov 6, 2024
346029a
chore(ci): added gitlab ci config file
Nov 6, 2024
87394aa
Merge: Checked local changes
Nov 6, 2024
51e4189
Merge branch 'merge/merge-local-changes' into 'master'
Nov 6, 2024
d9457fb
Update README.md
Nov 6, 2024
875d4c3
Merge branch 'master' of http://192.168.0.59:8080/data-lake/news-rss …
Nov 6, 2024
db3ad1a
Merge branch 'master' into master-sova
Nov 12, 2024
a0ad25d
Merge: merge local changes
Nov 12, 2024
9c78322
Merge branch 'merge/merge-local-changes' into 'master'
Nov 12, 2024
8434776
fix(compose): docker compose fixes
Nov 12, 2024
4ceb8f2
Merge branch 'master' of github.com:breadrock1/news-rss
Nov 12, 2024
61d2e10
Merge branch 'master' into master-sova
Nov 12, 2024
877c0d0
Fix: fixed docker compose config
Nov 12, 2024
a4676ea
Merge branch 'merge/merge-local-changes' into 'master'
Nov 12, 2024
2f55a26
Merge branch 'master' of http://192.168.0.59:8080/data-lake/news-rss …
Nov 12, 2024
f1947d2
Merge branch 'master' of github.com:breadrock1/news-rss
Nov 13, 2024
c06f701
Merge branch 'master' into master-sova
Nov 13, 2024
4c85fbf
Merge: local changes
Nov 14, 2024
63a6cdc
Merge branch 'merge/merge-local-changess' into 'master'
Nov 14, 2024
7cd088b
Merge branch 'master' of http://192.168.0.59:8080/data-lake/news-rss …
Nov 14, 2024
b9fd80f
Merge branch 'merge/merge-local-changess' into 'master'
Nov 15, 2024
6a8cafb
merge(local): merged local changes
Nov 15, 2024
d3752b7
Merged: merged local changes
Nov 15, 2024
cd3e516
Merge branch 'merge/merge-local-changes' into 'master'
Nov 15, 2024
56e5ae8
Merge branch 'master' of http://192.168.0.59:8080/data-lake/news-rss …
Nov 15, 2024
66adaca
Merge branch 'master' of github.com:breadrock1/news-rss
Nov 15, 2024
27f1263
Merge branch 'master' into master-sova
Nov 15, 2024
2981e06
Improve: moved topics config to db (#11)
Nov 15, 2024
698d55b
Merge branch 'merge/merge-local-changes' into 'master'
Nov 15, 2024
951e93e
Merge branch 'master' of http://192.168.0.59:8080/data-lake/news-rss …
Nov 15, 2024
0411a86
Update production.toml
Nov 15, 2024
0799f02
Merge branch 'master' of http://192.168.0.59:8080/data-lake/news-rss …
Nov 15, 2024
b0ccafb
chore(rmq): replaced direct to fanout exchange kind
Nov 25, 2024
6dff21f
Merge branch 'master' of github.com:breadrock1/news-rss
Nov 25, 2024
9e752b3
Merge branch 'master' of github.com:breadrock1/news-rss
Nov 25, 2024
e73561c
Merge branch 'master' into master-sova
Nov 25, 2024
de469df
Improve: Merge local changes
Nov 25, 2024
ef9abcb
Merge branch 'merge/local-changes' into 'master'
Nov 25, 2024
ab4b37c
Merge branch 'master' of http://192.168.0.59:8080/data-lake/news-rss …
Nov 25, 2024
a6c7ee4
chore(git): updated .gitignore file
Nov 28, 2024
015e4dc
fix(rss): return current datetime on parsing rss item error
Nov 28, 2024
936fa57
fix(rss): return err from spawned thread if rss error
Nov 28, 2024
d9ac2e8
chore(tracing): updated tracing msg formatting
Nov 28, 2024
a40ff87
chore(tracing): updated tracing msg formatting
Nov 28, 2024
40b8abc
Merge branch 'master' into master-sova
Nov 28, 2024
6289cb3
Merge branch 'chore/some-rss-changes-sova' into 'master'
Nov 28, 2024
9761a3f
fix(datetime): added dateparser crate
Nov 28, 2024
3a7bdbd
Merge branch 'fix/new-fixes' into 'master'
Nov 28, 2024
58df147
merge(conflict): resolved conflicts
Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.run
.vscode

/rabbitmq-data
Expand Down
41 changes: 41 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# This file is a template, and might need editing before it works on your project.
# This is a sample GitLab CI/CD configuration file that should run without any modifications.
# It demonstrates a basic 3 stage CI/CD pipeline. Instead of real tests or scripts,
# it uses echo commands to simulate the pipeline execution.
#
# A pipeline is composed of independent jobs that run scripts, grouped into stages.
# Stages run in sequential order, but jobs within stages run in parallel.
#
# For more information, see: https://docs.gitlab.com/ee/ci/yaml/index.html#stages
#
# You can copy and paste this template into a new `.gitlab-ci.yml` file.
# You should not add this template to an existing `.gitlab-ci.yml` file by using the `include:` keyword.
#
# To contribute improvements to CI/CD templates, please follow the Development guide at:
# https://docs.gitlab.com/ee/development/cicd/templates.html
# This specific template is located at:
# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Getting-Started.gitlab-ci.yml

stages: # List of stages for jobs, and their order of execution
- build
- test

build-job: # This job runs in the build stage, which runs first.
stage: build
script:
- cargo build --verbose

fmt-job:
stage: test
script:
- cargo fmt --all --verbose --check

clippy-job:
stage: test
script:
- cargo clippy --all --all-features

unit-test-job: # This job runs in the test stage.
stage: test # It only starts when the job in the build stage completes successfully.
script:
- cargo test --all --verbose
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test-publish-rabbit = []
anyhow = "^1.0"
async-trait = "^0.1"
config = "^0.14"
dateparser = "^0.2"
derive_builder = "^0.20"
getset = "^0.1"
lapin = "^2.5"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ with following pre-processing steps:
1. Clone the repository:

```shell
git clone https://github.com/breadrock1/news-rss.git
git clone http://<gitlab-domain-address>/data-lake/news-rss.git
cd news-rss
```

Expand Down
8 changes: 4 additions & 4 deletions config/production.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ password = "postgres"
max_pool_size = 10

[storage.pgsql]
address = "localhost:5432"
database = "ai-crawler"
username = "postgres"
password = "postgres"
address = "postgres:5432"
database = "agregator"
username = "agregator"
password = "agregator_password"
max_pool_size = 10

[crawler.llm]
Expand Down
6 changes: 3 additions & 3 deletions src/cache/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ impl CacheService for RedisClient {
let cxt = self.client.write().await;
match cxt.get_multiplexed_tokio_connection().await {
Err(err) => {
tracing::warn!("cache: failed to get redis connection {err:#?}");
tracing::warn!(err=?err, "cache: failed to get redis connection");
return;
}
Ok(mut conn) => {
let store_result: RedisResult<()> = conn.set_ex(key, value, expired_secs).await;
if let Err(err) = store_result {
tracing::warn!("cache: failed to store value to redis: {err:#?}");
tracing::warn!(err=?err, "cache: failed to store value to redis");
return;
}
}
Expand All @@ -58,7 +58,7 @@ impl CacheService for RedisClient {
match cxt.get_multiplexed_tokio_connection().await {
Ok(mut conn) => conn.get::<&str, PublishNews>(key).await.ok().is_some(),
Err(err) => {
tracing::warn!("failed to get redis service connection {err:#?}");
tracing::warn!(err=?err, "failed to get redis service connection");
false
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/cache/redis/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl redis::ToRedisArgs for PublishNews {
match serde_json::to_string(self) {
Ok(json_str) => out.write_arg_fmt(json_str),
Err(err) => {
tracing::error!("cacher: failed to serialize search parameters: {err:#?}");
tracing::error!(err=?err, "cacher: failed to serialize search parameters");
}
}
}
Expand All @@ -26,7 +26,7 @@ impl redis::FromRedisValue for PublishNews {
serde_json::from_slice::<PublishNews>(data.as_slice()).map_err(RedisError::from)
}
_ => {
let err = serde_json::Error::custom("failed to extract redis value type");
let err = serde_json::Error::custom("failed to extract redis value");
Err(RedisError::from(err))
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/crawler/llm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl CrawlerService for LlmCrawler {
match retriever::extract_semantic_blocks(&content) {
Ok(extracted) => Ok(extracted),
Err(err) => {
tracing::error!("failed to extract semantic blocks from llm: {err:#?}");
tracing::error!(err=?err, "failed to extract semantic blocks from llm");
Ok(content)
}
}
Expand All @@ -78,14 +78,14 @@ impl CrawlerService for LlmCrawler {
.await?
.error_for_status()
.map_err(|err| {
tracing::error!(err=?err, "failed to send request to url: {url}");
tracing::error!(err=?err, url=url, "failed to send request to url");
err
})?;

let html_str = response.text().await?;
let html_str = match html_editor::parse(&html_str) {
Err(err) => {
tracing::error!("failed to parse html: {err}");
tracing::error!(err = err, "failed to parse html");
html_str
}
Ok(mut dom) => dom
Expand Down
2 changes: 1 addition & 1 deletion src/crawler/llm/retriever.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn extract_json_semantic_blocks(text_data: &str) -> Result<String, anyhow::E
.filter_map(|it| match extract_json_object(it.as_str()) {
Ok(data) => Some(data),
Err(err) => {
tracing::warn!("failed while extracting json object: {err:#?}");
tracing::warn!(err=?err, "failed while extracting json object");
None
}
})
Expand Down
32 changes: 22 additions & 10 deletions src/feeds/rss_feeds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::feeds::FetchTopic;
use crate::publish::models::PublishNews;
use crate::publish::Publisher;

use chrono::NaiveDateTime;
use chrono::Utc;
use getset::{CopyGetters, Getters};
use regex::Regex;
use reqwest::Url;
Expand Down Expand Up @@ -79,13 +79,13 @@ where
Ok(channel) => {
let topic = &channel.title().to_string();
if let Err(err) = self.processing_event(channel).await {
tracing::error!("{topic}: failed while processing rss event: {err:#?}");
tracing::error!(err=?err, topic=topic, "failed while processing rss event");
continue;
};
}
Err(err) => {
tracing::error!("failed to fetch rss channel: {err:#?}");
continue;
tracing::error!(err=?err, "failed to fetch rss channel");
return Err(err.into());
}
}
}
Expand Down Expand Up @@ -114,32 +114,38 @@ where

pub async fn processing_event(&self, channel: rss::Channel) -> Result<(), anyhow::Error> {
let topic = channel.title();
tracing::info!("{topic}: received new content from {topic}");
tracing::info!("{topic}: received new content");

for item in channel.items() {
let response = match self.extract_item(item).await {
Ok(it) => it,
Err(err) => {
tracing::error!("{topic}: failed while converting rss item: {err:#?}");
tracing::error!(err=?err, "{topic}: failed while converting rss item");
continue;
}
};

let art_id = response.guid();
if self.cacher().contains(art_id).await {
tracing::warn!("news article {art_id} has been already parsed");
tracing::warn!(
article = art_id,
"{topic}: news article has been already parsed"
);
continue;
}

let art = PublishNews::from(response);
let art_id = art.id();
let publish = self.publisher();
if let Err(err) = publish.publish(&art).await {
tracing::error!("{topic}: failed to send news article {art_id}: {err:#?}");
tracing::error!(err=?err, article=art_id, "{topic}: failed to send article");
continue;
}

tracing::info!("{topic}: article {art_id} published successful");
tracing::info!(
article = art_id,
"{topic}: article has been published successful"
);
self.cacher.set(art_id, &art).await;
}

Expand Down Expand Up @@ -174,7 +180,13 @@ where

let pub_date = item
.pub_date()
.map(|it| NaiveDateTime::from_str(it).unwrap_or_default())
.map(|it| match dateparser::DateTimeUtc::from_str(it) {
Ok(time) => time.0.naive_utc(),
Err(err) => {
tracing::warn!(err=?err, time=it, "failed to extract datetime");
Utc::now().naive_utc()
}
})
.unwrap_or_default();

let photo_path = match item.itunes_ext() {
Expand Down
2 changes: 1 addition & 1 deletion src/publish/pgsql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl ServiceConnect for PgsqlPublisher {
let address = config.address();

let url = format!("postgresql://{user}:{passwd}@{address}/{db}");
tracing::info!("connecting to `{}` database", &url);
tracing::info!(db_url = url, "connecting to database");
let connection = PgPoolOptions::default()
.max_connections(config.max_pool_size())
.connect(&url)
Expand Down
2 changes: 1 addition & 1 deletion src/server/routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ where

let worker_name = form.target_url();
if let Some(worker) = workers_guard.get(worker_name) {
tracing::info!("worker {worker_name} already exists");
tracing::info!(worker = worker_name, "worker already exists");
if !worker.worker().is_finished() && !form.create_force() {
let msg = format!("worker {worker_name} already launched");
return Err(ServerError::Launched(msg));
Expand Down
2 changes: 1 addition & 1 deletion src/storage/pgsql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl ServiceConnect for PgsqlTopicStorage {
let address = config.address();

let url = format!("postgresql://{user}:{passwd}@{address}/{db}");
tracing::info!("connecting to `{}` database", &url);
tracing::info!(url = url, "connecting to database");
let connection = PgPoolOptions::default()
.max_connections(config.max_pool_size())
.connect(&url)
Expand Down
3 changes: 2 additions & 1 deletion tests/mocks/mock_rmq_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ impl Publisher for MockRabbitPublisher {
type Error = ();

async fn publish(&self, msg_body: &PublishNews) -> Result<(), Self::Error> {
tracing::info!("rabbit confirm message successful: {}", msg_body.id());
let id = msg_body.id();
tracing::info!(article = id, "rabbit confirm msg successful");
Ok(())
}
}
4 changes: 2 additions & 2 deletions tests/tests_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ pub async fn rabbit_consumer(queue: &str, config: &RabbitConfig) -> Result<(), a
consumer.set_delegate(move |delivery: DeliveryResult| async move {
let delivery = match delivery {
Ok(Some(delivery)) => {
tracing::info!("delivered {delivery:?}");
tracing::info!(delivery=?delivery, "msg has been delivered");
delivery
}
Ok(None) => return,
Err(err) => {
tracing::error!("failed to consume queue message {err:#?}");
tracing::error!(err=?err, "failed to consume queue msg");
return;
}
};
Expand Down