Skip to content

Commit

Permalink
Feat/add rss sources db (#7)
Browse files Browse the repository at this point in the history
* chore(compose): updated rabbitmq service version into compose

* chore(migrations): added migrations to store rss sources

* feat(storage): impled storage pkg based on pgsql

* chore(tests): added tests for storage pkg

* chore(config): updated configs to use new storage module

* chore(bins): added new feature to bins and tests

* chore(sqlx): prepared sqlx queries to use it offline

* fix(fmt): fixed fmt warnings

* chore(migrations): added additional table fields

* fix(build): fixed features code to build project

* fix(tests): fixed test cases after all changes

---------

Co-authored-by: Bread White <breadrock1@email.net>
  • Loading branch information
breadrock1 and Bread White authored Nov 11, 2024
1 parent 37d35ca commit 31849e0
Show file tree
Hide file tree
Showing 25 changed files with 395 additions and 19 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[features]
cache-redis = ["dep:redis"]
publish-offline = ["dep:sqlx"]
storage-pgsql = ["dep:sqlx"]
crawler-llm = ["dep:openai_dive", "dep:html2text", "dep:html_editor"]
default = []

Expand Down
9 changes: 8 additions & 1 deletion config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ username = "postgres"
password = "postgres"
max_pool_size = 10

[storage.pgsql]
address = "localhost:5432"
database = "ai-crawler"
username = "postgres"
password = "postgres"
max_pool_size = 10

[crawler.llm]
api_key = "sk-no-key-required"
base_url = "http://localhost:8081/v1"
Expand All @@ -37,7 +44,7 @@ base_url = "http://localhost:8081/v1"
max_retries = 3
timeout = 100
interval_secs = 3600
source_name = "NDTV Worlds News"
source_name = "NDTV World News"
target_url = "https://feeds.feedburner.com/ndtvnews-world-news"

# Available rss news sources:
Expand Down
9 changes: 8 additions & 1 deletion config/production.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ username = "postgres"
password = "postgres"
max_pool_size = 10

[storage.pgsql]
address = "localhost:5432"
database = "ai-crawler"
username = "postgres"
password = "postgres"
max_pool_size = 10

[crawler.llm]
api_key = "sk-no-key-required"
base_url = "http://llm:8081/v1"
Expand All @@ -33,5 +40,5 @@ base_url = "http://llm:8081/v1"
max_retries = 3
timeout = 100
interval_secs = 3600
source_name = "NDTV Worlds News"
source_name = "NDTV World News"
target_url = "https://feeds.feedburner.com/ndtvnews-world-news"
2 changes: 1 addition & 1 deletion docker-compose-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
- './redis-data:/data/cacher'

rabbitmq:
image: rabbitmq:3-management
image: rabbitmq:4-management
restart: on-failure
volumes:
- './rabbitmq-data:/var/lib/rabbitmq'
Expand Down
3 changes: 3 additions & 0 deletions migrations/20241111105852_add-rss-sources.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add down migration script here

DROP TABLE IF EXISTS rss_sources;
17 changes: 17 additions & 0 deletions migrations/20241111105852_add-rss-sources.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Add up migration script here

CREATE TABLE IF NOT EXISTS rss_sources(
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
link TEXT NOT NULL,
run_at_launch BOOL NOT NULL DEFAULT false,
max_retries INTEGER NOT NULL DEFAULT 3,
timeout INTEGER NOT NULL DEFAULT 100,
interval_secs INTEGER NOT NULL DEFAULT 3600
);

INSERT INTO rss_sources(name, link, run_at_launch)
VALUES ('NDTV World News', 'https://feeds.feedburner.com/ndtvnews-world-news', true);

INSERT INTO rss_sources(name, link, run_at_launch)
VALUES ('Sky World News', 'https://feeds.skynews.com/feeds/rss/world.xml', false);
35 changes: 34 additions & 1 deletion src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ use news_rss::crawler::llm::LlmCrawler;
#[cfg(feature = "publish-offline")]
use news_rss::publish::pgsql::PgsqlPublisher;

#[cfg(feature = "storage-pgsql")]
use news_rss::storage::pgsql;

#[allow(unused_imports)]
use news_rss::feeds::rss_feeds::config::RssConfig;

use news_rss::cache::local::LocalCache;
use news_rss::config::ServiceConfig;
use news_rss::crawler::native::NativeCrawler;
Expand Down Expand Up @@ -40,7 +46,11 @@ async fn main() -> Result<(), anyhow::Error> {
#[cfg(feature = "crawler-llm")]
let crawler = build_llm_crawler(&config).await?;

let rss_config = vec![config.topics().rss()];
#[allow(unused_variables)]
let rss_config = [config.topics().rss()];
#[cfg(feature = "storage-pgsql")]
let rss_config = load_topics_from_pgsql(&config).await?;

let rss_workers = rss_config
.into_iter()
.filter_map(|it| RssFeeds::new(it, publish.clone(), cache.clone(), crawler.clone()).ok())
Expand Down Expand Up @@ -122,3 +132,26 @@ pub async fn build_llm_crawler(config: &ServiceConfig) -> Result<Arc<LlmCrawler>
let crawler = Arc::new(crawler);
Ok(crawler)
}

#[cfg(feature = "storage-pgsql")]
pub async fn load_topics_from_pgsql(
config: &ServiceConfig,
) -> Result<Vec<RssConfig>, anyhow::Error> {
use news_rss::storage::LoadTopic;

let rss_config = config.topics().rss();

let pgsql_config = config.storage().pgsql();
let storage = pgsql::PgsqlTopicStorage::connect(pgsql_config).await?;
let mut topics = storage
.load_at_launch()
.await?
.into_iter()
.map(RssConfig::from)
.map(|it: RssConfig| (it.target_url().to_owned(), it))
.collect::<HashMap<String, RssConfig>>();

topics.insert(rss_config.target_url().to_owned(), rss_config);
let topics = topics.into_values().collect();
Ok(topics)
}
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::feeds::config::TopicsConfig;
use crate::logger::LoggerConfig;
use crate::publish::config::PublishConfig;
use crate::server::config::ServerConfig;
use crate::storage::config::StorageConfig;

use config::{Config, ConfigError, Environment, File};
use getset::Getters;
Expand All @@ -21,6 +22,7 @@ pub struct ServiceConfig {
publish: PublishConfig,
topics: TopicsConfig,
crawler: CrawlerConfig,
storage: StorageConfig,
}

impl ServiceConfig {
Expand Down
2 changes: 1 addition & 1 deletion src/crawler/llm/retriever.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn extract_json_object(repaired: &str) -> Result<SemanticBlock, anyhow::Error> {
mod test_llm_retriever {
use super::*;

const BROKEN_CNN_JSON: &str = include_str!("../../../tests/resources/cnn-news-llm-resp.txt");
const BROKEN_CNN_JSON: &str = include_str!("../../../tests/resources/cnn-json-llm-resp.txt");
const BROKEN_NDTV_JSON: &str = include_str!("../../../tests/resources/ndtv-news-llm-resp.txt");

#[test]
Expand Down
3 changes: 1 addition & 2 deletions src/feeds/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::feeds::rss_feeds::config::RssConfig;

use getset::Getters;
use serde::Deserialize;

#[derive(Clone, Deserialize, Getters)]
#[derive(Clone, Deserialize)]
pub struct TopicsConfig {
rss: RssConfig,
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod feeds;
pub mod logger;
pub mod publish;
pub mod server;
pub mod storage;

#[async_trait::async_trait]
pub trait ServiceConnect {
Expand Down
12 changes: 12 additions & 0 deletions src/storage/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#[cfg(feature = "storage-pgsql")]
use crate::storage::pgsql::config::PgsqlTopicStorageConfig;

use getset::Getters;
use serde::Deserialize;

#[derive(Clone, Deserialize, Getters)]
#[getset(get = "pub")]
pub struct StorageConfig {
#[cfg(feature = "storage-pgsql")]
pgsql: PgsqlTopicStorageConfig,
}
13 changes: 13 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub mod config;

#[cfg(feature = "storage-pgsql")]
pub mod pgsql;

#[async_trait::async_trait]
pub trait LoadTopic {
type Error;
type Topic;

async fn load_all(&self) -> Result<Vec<Self::Topic>, Self::Error>;
async fn load_at_launch(&self) -> Result<Vec<Self::Topic>, Self::Error>;
}
14 changes: 14 additions & 0 deletions src/storage/pgsql/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use getset::{CopyGetters, Getters};
use serde::Deserialize;

#[derive(Clone, Deserialize, Getters, CopyGetters)]
#[getset(get = "pub")]
pub struct PgsqlTopicStorageConfig {
address: String,
database: String,
username: String,
password: String,
#[getset(skip)]
#[getset(get_copy = "pub")]
max_pool_size: u32,
}
Loading

0 comments on commit 31849e0

Please sign in to comment.