Skip to content

Commit

Permalink
use batch inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
LetsMelon committed May 22, 2023
1 parent 7d077c8 commit 99010ff
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions src/stores/spar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use anyhow::Result;
use reqwest::Client;
use serde_json::Value;
use sqlx::types::Uuid;
use sqlx::PgPool;
use sqlx::{PgPool, Postgres, QueryBuilder};
use strum::IntoEnumIterator;
use strum_macros::EnumIter;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio::time::Instant;

use super::ExecuteCrawler;

Expand Down Expand Up @@ -77,7 +78,7 @@ impl SparUrl {
}
}

#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Deserialize, Clone)]
pub struct Product {
description: String,
#[serde(rename = "sales-unit")]
Expand Down Expand Up @@ -201,6 +202,8 @@ impl ExecuteCrawler for SparCrawl {
category: Self::Category,
products: Vec<(Self::Product, Uuid)>,
) -> Result<()> {
let mut products_with_id = Vec::with_capacity(products.len());

for (product, document_id) in products {
let product_id: Option<(Uuid,)> =
sqlx::query_as("select sp_id from sp_spar_product where sp_spar_id = $1")
Expand All @@ -213,10 +216,10 @@ impl ExecuteCrawler for SparCrawl {
let brand_name = product.brand.join(";");

let product_id: (Uuid,) = sqlx::query_as("insert into sp_spar_product (sp_spar_id, sp_description, sp_online_shop_url, sp_name, sp_brand, sp_sc_category) values ( $1, $2, $3, $4, $5, $6 ) returning sp_id")
.bind(product.id_internal)
.bind(product.description)
.bind(product.url)
.bind(product.name)
.bind(&product.id_internal)
.bind(&product.description)
.bind(&product.url)
.bind(&product.name)
.bind(brand_name)
.bind(category_map.get(&category).unwrap().clone())
.fetch_one(pool)
Expand All @@ -226,16 +229,25 @@ impl ExecuteCrawler for SparCrawl {
}
};

sqlx::query("insert into spr_spar_price (spr_price, spr_sales_unit, spr_price_unit, spr_sp_product, spr_sr_raw) values ( $1, $2, $3, $4, $5 )")
.bind(product.price)
.bind(product.sales_unit)
.bind(product.price_per_unit)
.bind(product_id)
.bind(document_id)
.execute(pool)
.await?;
products_with_id.push((product, document_id, product_id));
}

let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new("insert into spr_spar_price (spr_price, spr_sales_unit, spr_price_unit, spr_sp_product, spr_sr_raw)");

query_builder.push_values(
products_with_id.iter().take(16),
|mut b, (product, document_id, product_id)| {
b.push_bind(&product.price);
b.push_bind(&product.sales_unit);
b.push_bind(&product.price_per_unit);
b.push_bind(product_id);
b.push_bind(document_id);
},
);

let query = query_builder.build();
query.execute(pool).await?;

Ok(())
}

Expand All @@ -256,10 +268,14 @@ impl ExecuteCrawler for SparCrawl {
set.spawn(async move {
let permit = semaphore.acquire().await.unwrap();

println!("start with: {:?}", category);

let products = Self::download_category(crawl_id, client, &pool, category).await;

drop(permit);

println!("finish with: {:?}", category);

(products, category)
});
}
Expand All @@ -285,6 +301,8 @@ impl ExecuteCrawler for SparCrawl {
let semaphore = Arc::new(Semaphore::new(20));
let mut set = JoinSet::new();

let now = Instant::now();

for (category, products) in products_lists {
let semaphore = semaphore.clone();
let pool = pool.clone();
Expand All @@ -305,6 +323,9 @@ impl ExecuteCrawler for SparCrawl {
let _ = res;
}

let duration = now.elapsed();
println!("took: {:?} ms", duration.as_millis());

Ok(())
}
}

0 comments on commit 99010ff

Please sign in to comment.