Skip to content

Commit

Permalink
feat!: Fetch SuiteQL results more quickly using threads
Browse files Browse the repository at this point in the history
Function names and signatures have changed. Tread lightly!
  • Loading branch information
jacobsvante committed Nov 3, 2021
1 parent 7fa92f4 commit f4a2689
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 85 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ serde_json = { version = "1.0.68", features = ["preserve_order"] }
configparser = "3.0.0"
dirs = "4.0.0"
tiny_http = { version = "0.9.0", optional = true }
itertools = "0.10.1"

[dev-dependencies]
httpmock = "0.6.2"
rstest = "0.11.0"
tiny_http = { version = "0.9.0" }

[features]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ let api = RestApi::new(config);
# let api = RestApi::new(Config::new("123456", "2", "3", "4", "5")).with_base_url(server.base_url());;
# let mock = server.mock(|when, then| {
# when.method(POST).path("/query/v1/suiteql");
# then.status(200).body(r#"{"links":[{"rel":"next","href":"https://123456.suitetalk.api.netsuite.com/services/rest/query/v1/suiteql?limit=2&offset=2"},{"rel":"last","href":"https://123456.suitetalk.api.netsuite.com/services/rest/query/v1/suiteql?limit=2&offset=1998"},{"rel":"self","href":"https://123456.suitetalk.api.netsuite.com/services/rest/query/v1/suiteql?limit=2"}],"count":2,"hasMore":false,"items":[{"links":[],"currency":"1","internalid":"24","item":"24","pricelevel":"15","quantity":"1","saleunit":"1","unitprice":"95.49"},{"links":[],"currency":"1","internalid":"24","item":"24","pricelevel":"21","quantity":"1","saleunit":"1","unitprice":"19.99"}],"offset":0,"totalResults":2000}"#);
# then.status(200).body(r#"{"links":[],"count":2,"hasMore":false,"items":[{"links":[],"currency":"1","internalid":"24","item":"24","pricelevel":"15","quantity":"1","saleunit":"1","unitprice":"95.49"},{"links":[],"currency":"1","internalid":"24","item":"24","pricelevel":"21","quantity":"1","saleunit":"1","unitprice":"19.99"}],"offset":0,"totalResults":2}"#);
# });
let res = api.suiteql.fetch_all::<Price>("SELECT * FROM pricing");
# mock.assert();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ mod suiteql;
pub use config::*;
pub use error::*;
pub use rest_api::*;
pub use suiteql::*;
105 changes: 83 additions & 22 deletions src/suiteql.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,134 @@
use core::num::NonZeroU8;
use std::collections::VecDeque;
use std::thread;

use http::Method;
use itertools::Itertools;
use log::debug;
use serde_json::Value;

use crate::error::Error;
use crate::params::Params;
use crate::requester::Requester;
use crate::response::Response;
use http::Method;

#[derive(Debug, Clone)]
pub struct SuiteQl {
requester: Requester,
limit: usize,
default_limit: usize,
threads: NonZeroU8,
}

impl SuiteQl {
pub fn new(requester: Requester) -> Self {
Self {
requester,
limit: 1000,
default_limit: 1000,
threads: NonZeroU8::new(10).unwrap(),
}
}

pub fn set_limit(&mut self, limit: usize) {
self.limit = limit;
pub fn set_default_limit(&mut self, default_limit: usize) {
self.default_limit = default_limit;
}

pub fn set_threads(&mut self, threads: NonZeroU8) {
self.threads = threads;
}

pub fn raw(&self, query: &str, limit: usize, offset: usize) -> Result<Response, Error> {
pub fn raw(&self, query: &str, limit: usize, offset: usize) -> Result<SuiteQlResponse, Error> {
let mut params = Params::new();
params.push("limit", limit);
params.push("offset", offset);
let mut headers = Params::new();
let payload = SuiteQlPayload { q: query };
let payload = serde_json::to_string(&payload)?;
headers.push("Prefer", "transient");
self.requester.request(
let res = self.requester.request(
Method::POST,
"query/v1/suiteql",
Some(params),
Some(headers),
Some(&payload),
)
)?;
Ok(serde_json::from_str::<SuiteQlResponse>(res.body())?)
}

pub fn fetch_all<T: serde::de::DeserializeOwned>(&self, query: &str) -> Result<Vec<T>, Error> {
let mut collected = Vec::new();
for i in 0.. {
let res = self.raw(query, self.limit, self.limit * i)?;
let res: SuiteQlResponse = serde_json::from_str(res.body())?;
let parsed: Vec<T> = serde_json::from_value(res.items)?;
collected.extend(parsed);
if !res.has_more {
break;
pub fn fetch_values(&self, query: &str) -> Result<Vec<Value>, Error> {
let limit = self.default_limit;

let res = self.raw(query, limit, 0)?;
let total_results = res.total_results;
let mut collected = res.items;

if limit >= total_results {
return Ok(collected);
}

let additional_requests =
(total_results / limit) - 1 + (if total_results % limit > 0 { 1 } else { 0 });

let num_threads: u8 = self.threads.into();
let mut handles = VecDeque::with_capacity(num_threads.into());

debug!("Fetched initial SuiteQL result set. {} total results. Now doing {} additional requests, on {} threads", total_results, additional_requests, num_threads);

for req_no_chunk in &(1..=additional_requests).chunks(num_threads.into()) {
for req_no in req_no_chunk {
let offset = limit * req_no;
let api = self.clone();
let q = query.to_string();
let builder = thread::Builder::new().name(format!("thread-req-no-{}", req_no));
let handle = builder.spawn(move || {
let res = api.raw(&q, limit, offset);
debug!("Retrieved response #{}/{}", req_no, additional_requests);
res
})?;
handles.push_back(handle);
}

while let Some(handle) = handles.pop_front() {
let mut res = handle.join().unwrap()?;
collected.append(&mut res.items);
}
debug_assert_eq!(handles.len(), 0);
}
Ok(collected)
}

pub fn fetch_all<T: serde::de::DeserializeOwned>(&self, query: &str) -> Result<Vec<T>, Error> {
let values = self.fetch_values(query)?;
let mut typed: Vec<T> = Vec::with_capacity(values.len());
for value in values {
typed.push(serde_json::from_value(value)?)
}
Ok(typed)
}
}

#[derive(Debug, serde::Deserialize)]
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
struct Link {
rel: String,
href: String,
}

#[derive(Debug, serde::Deserialize)]
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct SuiteQlResponse {
pub struct SuiteQlResponse {
links: Vec<Link>,
count: usize,
has_more: bool,
total_results: usize,
items: serde_json::value::Value,
items: Vec<Value>,
}

impl SuiteQlResponse {
pub fn into_typed_items<T: serde::de::DeserializeOwned>(self) -> Result<Vec<T>, Error> {
let mut vec: Vec<T> = Vec::with_capacity(self.items.len());
for v in self.items {
vec.push(serde_json::from_value(v)?);
}
Ok(vec)
}
}

#[derive(serde::Serialize)]
Expand Down
169 changes: 107 additions & 62 deletions tests/suiteql.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,51 @@
use httpmock::{Method::POST, MockServer};
use httpmock::{Method::POST, Mock, MockServer};
use itertools::Itertools;
use netsuite::{Config, RestApi};
use std::sync::Once;
use rstest::rstest;
use std::{ops::RangeInclusive, sync::Once};

#[derive(Debug, PartialEq, serde::Deserialize)]
fn make_prices(range: RangeInclusive<usize>) -> Vec<Price> {
let mut prices = Vec::new();
for p in range {
prices.push(Price {
unitprice: p.to_string(),
})
}
prices
}

fn price_bodies(count: usize, limit: usize) -> Vec<Body> {
let mut bodies = Vec::with_capacity(count / limit + 1);
let mut num_created = 0_usize;
for chunk in &(1..=count).chunks(limit) {
let mut iter = chunk.map(|i| i);
let start = iter.next().unwrap();
let range = RangeInclusive::new(start, iter.last().unwrap_or(start));
let prices = make_prices(range);
num_created += prices.len();
let body = Body {
links: [],
count: prices.len(),
has_more: num_created < count,
total_results: count,
items: prices,
};
bodies.push(body);
}
bodies
}

#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
struct Body {
links: [bool; 0],
count: usize,
has_more: bool,
total_results: usize,
items: Vec<Price>,
}

#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
struct Price {
unitprice: String,
}
Expand All @@ -20,77 +63,79 @@ fn make_api(server: &MockServer) -> RestApi {
RestApi::new(config).with_base_url(server.base_url())
}

#[test]
fn raw_no_params() {
#[rstest]
fn fetch_values(
#[values(1, 1001, 2479, 10495)] n_results: usize,
#[values(1000, 2000)] limit: usize,
) {
ensure_logging();
let server = MockServer::start();
let api = make_api(&server);
let mock = server.mock(|when, then| {
when.method(POST)
.path("/query/v1/suiteql")
.query_param("limit", "1000")
.query_param("offset", "0");
then.status(200).body("test");
});
let res = api.suiteql.raw("SELECT * FROM pricing", 1000, 0);
mock.assert();
assert_eq!(res.unwrap().body(), "test");
}

#[test]
fn raw_limit_param() {
ensure_logging();
let server = MockServer::start();
let api = make_api(&server);
let mock = server.mock(|when, then| {
when.method(POST)
.path("/query/v1/suiteql")
.query_param("limit", "2")
.query_param("offset", "0");
then.status(200).body("test");
});
let res = api.suiteql.raw("SELECT * FROM pricing", 2, 0);
mock.assert();
assert_eq!(res.unwrap().body(), "test");
let api = {
let mut api = make_api(&server);
api.suiteql.set_default_limit(limit);
api
};

let mocks: Vec<Mock> = price_bodies(n_results, limit)
.iter()
.enumerate()
.map(|(i, body)| {
server.mock(|when, then| {
when.method(POST)
.path("/query/v1/suiteql")
.query_param("limit", &limit.to_string())
.query_param("offset", &(i * limit).to_string());
then.status(200).json_body_obj(body);
})
})
.collect();

let res = api.suiteql.fetch_values("SELECT * FROM pricing");
let values = res.unwrap();
assert_eq!(values.len(), n_results);
for mock in mocks {
mock.assert();
}
let expected_prices = make_prices(1..=n_results);
let expected_values: Vec<serde_json::Value> = expected_prices
.into_iter()
.map(|p| serde_json::to_value(p).unwrap())
.collect();
assert_eq!(values, expected_values);
}

#[test]
fn no_params() {
#[rstest]
fn fetch_all(#[values(1, 1001, 2479, 10495)] n_results: usize, #[values(1000, 2000)] limit: usize) {
ensure_logging();

let server = MockServer::start();
let api = {
let mut api = make_api(&server);
api.suiteql.set_limit(2);
api.suiteql.set_default_limit(limit);
api
};
let mock1 = server.mock(|when, then| {
when.method(POST).path("/query/v1/suiteql").query_param("offset", "0");
then.status(200).body(r#"{"links": [], "count": 2, "hasMore": true, "totalResults": 4, "items": [{"unitprice": "1"}, {"unitprice": "2"}]}"#);
});
let mock2 = server.mock(|when, then| {
when.method(POST).path("/query/v1/suiteql").query_param("offset", "2");
then.status(200).body(r#"{"links": [], "count": 2, "hasMore": false, "totalResults": 4, "items": [{"unitprice": "3"}, {"unitprice": "4"}]}"#);
});

let mocks: Vec<Mock> = price_bodies(n_results, limit)
.iter()
.enumerate()
.map(|(i, body)| {
server.mock(|when, then| {
when.method(POST)
.path("/query/v1/suiteql")
.query_param("limit", &limit.to_string())
.query_param("offset", &(i * limit).to_string());
then.status(200).json_body_obj(body);
})
})
.collect();

let res = api.suiteql.fetch_all::<Price>("SELECT * FROM pricing");
let prices = res.unwrap();
mock1.assert();
mock2.assert();
assert_eq!(
prices,
[
Price {
unitprice: "1".into()
},
Price {
unitprice: "2".into()
},
Price {
unitprice: "3".into()
},
Price {
unitprice: "4".into()
}
]
);
assert_eq!(prices.len(), n_results);
for mock in mocks {
mock.assert();
}
let expected_prices = make_prices(1..=n_results);
assert_eq!(prices, expected_prices);
}

0 comments on commit f4a2689

Please sign in to comment.