Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding BufferConfig and interface for specifying CustomAllocator(s) #79

Merged
merged 7 commits into from
Sep 29, 2020
15 changes: 14 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@ os: linux
dist: xenial
language: rust
jobs:
allow_failures:
- if: rust = nightly
include:
- stage: test
name: "Build and test root: stable"
rust: stable
script:
- cargo build --verbose --all
- cargo test --verbose --all
- stage: test # remove this stage when nightly passes
name: "Build and test root: nightly-2020-09-21"
rust: nightly-2020-09-21
script:
- cargo build --verbose --all
- cargo test --verbose --all
- stage: test
name: "Build and test root: nightly"
rust: nightly
script:
- rustup default nightly-2020-09-21 # remove this line when current external bug is fixed
- cargo build --verbose --all
- cargo test --verbose --all
- stage: test-features
Expand All @@ -36,6 +43,12 @@ jobs:
script:
- cd core
- ./test-feature-matrix.sh
- stage: test-features # remove this stage when nightly passes
rust: nightly-2020-09-21
name: "Test feature matrix: nightly"
script:
- cd core
- ./test-feature-matrix.sh
- stage: deploy
rust: nightly
name: "Github Release"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ slog = "2"
slog-async = "2"
slog-term = "2"
rustc-hash = "1.1"
hocon = {version = "0.3.5", default-features = false}
hocon = {version = "0.3", default-features = false}
hierarchical_hash_wheel_timer = "1.0"
owning_ref = "0.4"
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions core/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl NetworkConfig {
NetworkConfig {
addr,
transport: Transport::TCP,
buffer_config: BufferConfig::default(),
buffer_config: BufferConfig::new(),
custom_allocator: None,
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ impl Default for NetworkConfig {
NetworkConfig {
addr: "127.0.0.1:0".parse().unwrap(),
transport: Transport::TCP,
buffer_config: BufferConfig::default(),
Bathtor marked this conversation as resolved.
Show resolved Hide resolved
buffer_config: BufferConfig::new(),
custom_allocator: None,
}
}
Expand Down
75 changes: 43 additions & 32 deletions core/src/net/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct BufferConfig {
}

impl BufferConfig {
/// Create a new BufferConfig with reasonable values
pub fn default() -> Self {
/// Create a new BufferConfig with default values which may be overwritten
Bathtor marked this conversation as resolved.
Show resolved Hide resolved
pub fn new() -> Self {
BufferConfig {
chunk_size: 128 * 1000, // 128Kb chunks
initial_pool_count: 2, // 256Kb initial/minimum BufferPools
Expand All @@ -42,27 +42,27 @@ impl BufferConfig {
}
}

/// Creates a new BufferConfig with specified values and performs validation
pub fn new(
chunk_size: usize,
initial_pool_count: usize,
max_pool_count: usize,
encode_min_remaining: usize,
) -> Self {
let buffer_config = BufferConfig {
chunk_size,
initial_pool_count,
max_pool_count,
encode_min_remaining,
};
buffer_config.validate();
buffer_config
/// Sets the BufferConfigs chunk_size to the given value
Bathtor marked this conversation as resolved.
Show resolved Hide resolved
pub fn chunk_size(&mut self, size: usize) -> () {
self.chunk_size = size;
}
/// Sets the BufferConfigs initial_pool_count to the given value
pub fn initial_pool_count(&mut self, count: usize) -> () {
self.initial_pool_count = count;
}
/// Sets the BufferConfigs max_pool_count to the given value
pub fn max_pool_count(&mut self, count: usize) -> () {
self.max_pool_count = count;
}
/// Sets the BufferConfigs encode_min_remaining to the given value
pub fn encode_min_remaining(&mut self, size: usize) -> () {
self.encode_min_remaining = size;
}

/// Tries to deserialize a BufferConfig from the specified in the given `config`.
/// Returns a default BufferConfig if it fails to read from the config.
pub fn from_config(config: &Hocon) -> BufferConfig {
let mut buffer_config = BufferConfig::default();
let mut buffer_config = BufferConfig::new();
if let Some(chunk_size) = config["buffer_config"]["chunk_size"].as_i64() {
buffer_config.chunk_size = chunk_size as usize;
}
Expand Down Expand Up @@ -832,14 +832,17 @@ mod tests {
#[should_panic(expected = "chunk_size must be greater than encode_min_remaining")]
fn invalid_encode_min_remaining_validation() {
// The BufferConfig should panic because encode_min_remain too high
let _ = BufferConfig::new(128, 10, 10, 128);
let mut buffer_config = BufferConfig::new();
buffer_config.chunk_size(128);
buffer_config.encode_min_remaining(128);
buffer_config.validate();
}

// This test instantiates an EncodeBuffer and writes the same string into it enough times that
// the EncodeBuffer should overload multiple times and will have to succeed in reusing >=1 Chunk
#[test]
fn encode_buffer_overload_reuse_default_config() {
let buffer_config = BufferConfig::default();
let buffer_config = BufferConfig::new();
let encode_buffer = encode_buffer_overload_reuse(&buffer_config);
// Check the buffer pool sizes
assert_eq!(
Expand All @@ -854,12 +857,12 @@ mod tests {
// As above, except we create a much larger BufferPool with larger Chunks manually configured
#[test]
fn encode_buffer_overload_reuse_manually_configured_large_buffers() {
let buffer_config = BufferConfig::new(
50000000, // 50 MB chunk_size
10, // 500 MB pool init value
20, // 1 GB pool max size
256, // 256 B min_remaining
);
let mut buffer_config = BufferConfig::new();
buffer_config.chunk_size(50000000); // 50 MB chunk_size
buffer_config.initial_pool_count(10); // 500 MB pool init value
buffer_config.max_pool_count(20); // 1 GB pool max size
buffer_config.encode_min_remaining(256);// 256 B min_remaining

let encode_buffer = encode_buffer_overload_reuse(&buffer_config);
assert_eq!(encode_buffer.buffer.len(), 50000000);
// Check the buffer pool sizes
Expand Down Expand Up @@ -944,7 +947,7 @@ mod tests {
#[should_panic(expected = "src too big for buffering")]
fn encode_buffer_panic() {
// Instantiate an encode buffer with default values
let buffer_config = BufferConfig::default();
let buffer_config = BufferConfig::new();
let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None);
let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer);
use std::string::ToString;
Expand All @@ -958,7 +961,7 @@ mod tests {
#[test]
fn aligned_after_drop() {
// Instantiate an encode buffer with default values
let buffer_config = BufferConfig::default();
let buffer_config = BufferConfig::new();
let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None);
{
let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer);
Expand Down Expand Up @@ -1003,10 +1006,13 @@ mod tests {
impl ComponentLifecycle for BufferTestActor {
fn on_start(&mut self) -> Handled {
if self.custom_buf {
let mut buffer_config = BufferConfig::new();
buffer_config.encode_min_remaining(30);
buffer_config.max_pool_count(5);
buffer_config.initial_pool_count(4);
buffer_config.chunk_size(128);
// Initialize the buffers
self.ctx
.borrow()
.init_buffers(Some(BufferConfig::new(128, 4, 5, 30)), None);
self.ctx.borrow().init_buffers(Some(buffer_config), None);
}
// Use the Buffer
let _ = self.ctx.actor_path().clone().tell_serialised(120, self);
Expand All @@ -1023,6 +1029,11 @@ mod tests {
}
fn buffer_config_testing_system() -> KompactSystem {
let mut cfg = KompactConfig::new();
let mut network_buffer_config = BufferConfig::new();
network_buffer_config.chunk_size(512);
network_buffer_config.initial_pool_count(2);
network_buffer_config.max_pool_count(3);
network_buffer_config.encode_min_remaining(10);
cfg.load_config_str(
r#"{
buffer_config {
Expand All @@ -1036,7 +1047,7 @@ mod tests {
cfg.system_components(DeadletterBox::new, {
NetworkConfig::with_buffer_config(
"127.0.0.1:0".parse().expect("Address should work"),
BufferConfig::new(512, 2, 3, 10),
network_buffer_config,
)
.build()
});
Expand Down
1 change: 1 addition & 0 deletions core/src/net/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl BufferPool {
config: &BufferConfig,
custom_allocator: &Option<Arc<dyn ChunkAllocator>>,
) -> Self {
config.validate();
let chunk_allocator = {
if let Some(allocator) = custom_allocator {
allocator.clone()
Expand Down
17 changes: 10 additions & 7 deletions core/src/net/network_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,6 @@ fn bind_with_retries(
mod tests {
use super::*;
use crate::{dispatch::NetworkConfig, prelude::BufferConfig};
use std::net::{IpAddr, Ipv4Addr};

// Cleaner test-cases for manually running the thread
fn poll_and_handle(thread: &mut NetworkThread) -> () {
Expand Down Expand Up @@ -872,8 +871,8 @@ mod tests {
fn merge_connections_basic() -> () {
// Sets up two NetworkThreads and does mutual connection request

let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7778);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7780);
let addr1 = "127.0.0.1:7778".parse().expect("Address should work");
let addr2 = "127.0.0.1:7780".parse().expect("Address should work");

let (mut network_thread1, input_queue_1_sender, mut network_thread2, input_queue_2_sender) =
setup_two_threads(addr1, addr2);
Expand Down Expand Up @@ -945,8 +944,8 @@ mod tests {
// Sets up two NetworkThreads and does mutual connection request
// This test uses a different order of events than basic

let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8778);
let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8780);
let addr1 = "127.0.0.1:8778".parse().expect("Address should work");
let addr2 = "127.0.0.1:8780".parse().expect("Address should work");

let (mut network_thread1, input_queue_1_sender, mut network_thread2, input_queue_2_sender) =
setup_two_threads(addr1, addr2);
Expand Down Expand Up @@ -1016,8 +1015,12 @@ mod tests {

#[test]
fn network_thread_custom_buffer_config() -> () {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9788);
let buffer_config = BufferConfig::new(128, 13, 14, 10);
let addr = "127.0.0.1:9788".parse().expect("Address should work");
let mut buffer_config = BufferConfig::new();
buffer_config.chunk_size(128);
buffer_config.max_pool_count(14);
buffer_config.initial_pool_count(13);
buffer_config.encode_min_remaining(10);
let network_config = NetworkConfig::with_buffer_config(addr, buffer_config);
let mut cfg = KompactConfig::new();
cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
Expand Down