Skip to content

Commit

Permalink
chunk push requests in order not to have too many postgres bindings (…
Browse files Browse the repository at this point in the history
…up to i16::MAX); should fix #67 (#68)
  • Loading branch information
naim94a authored Feb 17, 2023
1 parent c872bb3 commit 11a0162
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,19 @@ impl Database {
}

pub async fn push_funcs<'a, 'b>(&'b self, user: &'a crate::rpc::RpcHello<'a>, funcs: &'a crate::rpc::PushMetadata<'a>, scores: &[u32]) -> Result<Vec<bool>, anyhow::Error> {
use futures_util::TryStreamExt;

// postgres has a limitation of binding per statement (i16::MAX). Split large push requests into smaller chunks.
const PUSH_FUNC_CHUNK_SIZE: usize = 3000;

let db_id = self.get_or_create_db(user, funcs).await?;

let mut rows = Vec::with_capacity(funcs.funcs.len());
for (func, &score) in funcs.funcs.iter().zip(scores.iter()) {
let mut rows = Vec::with_capacity(funcs.funcs.len().min(PUSH_FUNC_CHUNK_SIZE));
let mut is_new = Vec::with_capacity(funcs.funcs.len());
let conn = &mut self.diesel.get().await?;
let f2 = diesel::alias!(schema::funcs as f2);

for (idx, (func, &score)) in funcs.funcs.iter().zip(scores.iter()).enumerate() {
let name = func.name;
let len = func.func_len as i32;
let chksum = func.hash;
Expand All @@ -286,24 +295,34 @@ impl Database {
schema::funcs::rank.eq(score),
schema::funcs::db_id.eq(db_id),
));
}

let f2 = diesel::alias!(schema::funcs as f2);
let conn = &mut self.diesel.get().await?;
if rows.len() < PUSH_FUNC_CHUNK_SIZE && idx < funcs.funcs.len() - 1 {
continue;
}

let mut current_rows = Vec::with_capacity((funcs.funcs.len() - (idx + 1)).max(PUSH_FUNC_CHUNK_SIZE));
std::mem::swap(&mut current_rows, &mut rows);

diesel::insert_into(schema::funcs::table)
.values(current_rows)
.on_conflict((schema::funcs::chksum, schema::funcs::db_id))
.do_update()
.set((
schema::funcs::name.eq(excluded(schema::funcs::name)),
schema::funcs::metadata.eq(excluded(schema::funcs::metadata)),
schema::funcs::rank.eq(excluded(schema::funcs::rank)),
schema::funcs::update_dt.eq(diesel::dsl::now)
))
.returning(diesel::dsl::not(diesel::dsl::exists(f2.filter(f2.field(schema::funcs::chksum).eq(schema::funcs::chksum))))) // xmax=0 when a new row is created.
.load_stream::<bool>(conn)
.await?
.try_fold(&mut is_new, |acc, item: bool| {
acc.push(item);
futures_util::future::ready(Ok(acc))
})
.await?;
}

let is_new: Vec<bool> = diesel::insert_into(schema::funcs::table)
.values(rows)
.on_conflict((schema::funcs::chksum, schema::funcs::db_id))
.do_update()
.set((
schema::funcs::name.eq(excluded(schema::funcs::name)),
schema::funcs::metadata.eq(excluded(schema::funcs::metadata)),
schema::funcs::rank.eq(excluded(schema::funcs::rank)),
schema::funcs::update_dt.eq(diesel::dsl::now)
))
.returning(diesel::dsl::not(diesel::dsl::exists(f2.filter(f2.field(schema::funcs::chksum).eq(schema::funcs::chksum))))) // xmax=0 when a new row is created.
.get_results::<bool>(conn)
.await?;
Ok(is_new)
}

Expand Down

0 comments on commit 11a0162

Please sign in to comment.