Skip to content

Commit

Permalink
Added Async Babushka Client and support for Cluster Mode
Browse files Browse the repository at this point in the history
Modified benchmarking so it can handle multiple clients running.
Added support for cluster mode.
Added POC go-babushka client.
Reformatted project structure for benchmark app.
Added README
  • Loading branch information
SanHalacogluImproving authored Nov 14, 2023
1 parent cccb946 commit fb4f2df
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 87 deletions.
15 changes: 11 additions & 4 deletions benchmarks/install_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ function runRustBenchmark(){
}

function runGoBenchmark() {
cd ${BENCH_FOLDER}/../go/benchmarks/main/benchmarkApp
echo "go run main.go --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag"
go run main.go --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag
cd ${BENCH_FOLDER}/../go/benchmarks/benchmarkApp
echo "Compiling Go code..."
go build -o main main.go
echo "./main --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag"
./main --resultsFile=${BENCH_FOLDER}/$1 --concurrentTasks "$concurrentTasks" --dataSize "$2" --clients $chosenClients --host $host --clientCount "$clientCount" $tlsFlag $clusterFlag $portFlag
}


Expand Down Expand Up @@ -205,13 +207,18 @@ do
-go)
runAllBenchmarks=0
runGo=1
chosenClients="babushka"
chosenClients="all"
;;
-go-redis)
runAllBenchmarks=0
runGo=1
chosenClients="go-redis"
;;
-go-babushka)
runAllBenchmarks=0
runGo=1
chosenClients="go-babushka"
;;
-only-socket)
chosenClients="socket"
;;
Expand Down
54 changes: 54 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Summary - Go Wrapper

This module contains a POC Go-client wrapper that connects to the `Babushka`-Rust-client. The Rust client connects to
Redis, while this wrapper provides Go-language binding. The objective of this wrapper is to provide a thin-wrapper
language api to enhance performance and limit cpu cycles at scale.

## Organization

The Go client contains the following parts:

1. A Go client: wrapper to Rust-client.
2. An examples script: to sanity test go-babushka and similar go-clients against a Redis host.
3. A benchmark app: to performance benchmark go-babushka and standard Go clients against a Redis host.

## Building

1) Build the Babushka-Core:
Navigate to `babushka/babushka-core`.
Run the command: `cargo build --release`.
2) Build go-babushka within the Go Benchmarks:
Change your current directory to: `babushka/go/benchmarks/babushkaclient`.
Execute the build command: cargo `build --release`.
## Code style


Our project enforces a consistent coding style using go fmt, which is the standard code formatter for Go. To ensure your code conforms to this style, run the following command before every commit: `go fmt ./...`
This command should be executed from the `babushka/go` directory.

## Benchmarks
You can run benchmarks in your Go application at the `babushka/go/benchmarks/benchmarkApp` directory using the executable generated by the Go build process. To do this, first compile your application with the command in the directory:
`go build -o main main.go`. You can set arguments using the args flag like:

```shell
./main -help
./main -resultsFile=output.csv -dataSize "100 1000" -concurrentTasks "10 100" -clients all -host localhost -port 6379 -clientCount "1 5" -tls
```

The following arguments are accepted:
* `resultsFile`: the results output file
* `concurrentTasks`: Number of concurrent tasks
* `clients`: one of: all|go-redis|go-babushka
* `clientCount`: Client count
* `host`: Redis server host url
* `port`: Redis server port number
* `tls`: Redis TLS configured
* `clusterModeEnabled`: Redis cluster mode enabled

### Troubleshooting

* Connection Timeout:
* If you're unable to connect to Redis, check that you are connecting to the correct host, port, and TLS configuration.
* Only server-side certificates are supported by the TLS configured Redis.
* If your Go program has trouble finding the required dynamic library on a Linux system, ensure to set the LD_LIBRARY_PATH environment variable. For different operating systems, the equivalent environment variable should be adjusted accordingly.

3 changes: 3 additions & 0 deletions go/benchmarks/babushkaclient/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[env]
BABUSHKA_NAME = { value = "BabushkaGo", force = true }
BABUSHKA_VERSION = "0.1.0"
25 changes: 25 additions & 0 deletions go/benchmarks/babushkaclient/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "babushkaclient"
version = "0.1.0"
edition = "2021"
build = "build.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[build-dependencies]
cbindgen = "0.20"

[lib]
crate-type = ["cdylib"]

[dependencies]
redis = { path = "../../../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "tls", "tokio-native-tls-comp", "tls-rustls-insecure"] }
babushka = { path = "../../../babushka-core" }
tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"] }
num-derive = "0.4.0"
num-traits = "0.2.15"
logger_core = {path = "../../../logger_core"}
tracing-subscriber = "0.3.16"

[profile.release]
lto = true
debug = true
83 changes: 83 additions & 0 deletions go/benchmarks/babushkaclient/babushkaclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package babushkaclient

/*
#cgo LDFLAGS: -L./target/release -lbabushkaclient
#include "lib.h"
void successCallback(char *message, uintptr_t channelPtr);
void failureCallback(uintptr_t channelPtr);
*/
import "C"

import (
"fmt"
"github.com/aws/babushka/go/benchmarks"
"unsafe"
)

type BabushkaRedisClient struct {
coreClient unsafe.Pointer
}

//export successCallback
func successCallback(message *C.char, channelPtr C.uintptr_t) {
goMessage := C.GoString(message)
channelAddress := uintptr(channelPtr)
channel := *(*chan string)(unsafe.Pointer(channelAddress))
channel <- goMessage
}

//export failureCallback
func failureCallback(channelPtr C.uintptr_t) {
panic("Failure for get or set")
}

func (babushkaRedisClient *BabushkaRedisClient) ConnectToRedis(connectionSettings *benchmarks.ConnectionSettings) error {
caddress := C.CString(connectionSettings.Host)
defer C.free(unsafe.Pointer(caddress))

babushkaRedisClient.coreClient = C.create_connection(caddress, C.uint32_t(connectionSettings.Port), C._Bool(connectionSettings.UseSsl), C._Bool(connectionSettings.ClusterModeEnabled), (C.SuccessCallback)(unsafe.Pointer(C.successCallback)), (C.FailureCallback)(unsafe.Pointer(C.failureCallback)))
if babushkaRedisClient.coreClient == nil {
return fmt.Errorf("error connecting to babushkaRedisClient")
}
return nil
}

func (babushkaRedisClient *BabushkaRedisClient) Set(key string, value interface{}) error {
strValue := fmt.Sprintf("%v", value)
ckey := C.CString(key)
cval := C.CString(strValue)
defer C.free(unsafe.Pointer(ckey))
defer C.free(unsafe.Pointer(cval))

result := make(chan string)
chAddress := uintptr(unsafe.Pointer(&result))

C.set(babushkaRedisClient.coreClient, ckey, cval, C.uintptr_t(chAddress))

<-result

return nil
}

func (babushkaRedisClient *BabushkaRedisClient) Get(key string) (string, error) {
ckey := C.CString(key)
defer C.free(unsafe.Pointer(ckey))

result := make(chan string)
chAddress := uintptr(unsafe.Pointer(&result))

C.get(babushkaRedisClient.coreClient, ckey, C.uintptr_t(chAddress))
value := <-result

return value, nil
}

func (babushkaRedisClient *BabushkaRedisClient) CloseConnection() error {
C.close_connection(babushkaRedisClient.coreClient)
return nil
}

func (babushkaRedisClient *BabushkaRedisClient) GetName() string {
return "babushka"
}
12 changes: 12 additions & 0 deletions go/benchmarks/babushkaclient/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use std::env;

fn main() {
let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();

cbindgen::Builder::new()
.with_crate(crate_dir)
.with_language(cbindgen::Language::C)
.generate()
.expect("Unable to generate bindings")
.write_to_file("lib.h");
}
160 changes: 160 additions & 0 deletions go/benchmarks/babushkaclient/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use babushka::client::Client as BabushkaClient;
use babushka::connection_request;
use babushka::connection_request::AddressInfo;
use redis::{Cmd, FromRedisValue, RedisResult};
use std::{
ffi::{c_void, CStr, CString},
os::raw::c_char,
};
use tokio::runtime::Builder;
use tokio::runtime::Runtime;

pub type SuccessCallback = unsafe extern "C" fn(message: *const c_char, channel_address: usize) -> ();
pub type FailureCallback = unsafe extern "C" fn(channel_address: usize) -> ();



pub struct Connection {
connection: BabushkaClient,
success_callback: SuccessCallback,
failure_callback: FailureCallback,
runtime: Runtime,
}

fn create_connection_request(
host: String,
port: u32,
use_tls: bool,
use_cluster_mode: bool,
) -> connection_request::ConnectionRequest {
let mut address_info = AddressInfo::new();
address_info.host = host.to_string().into();
address_info.port = port;
let addresses_info = vec![address_info];
let mut connection_request = connection_request::ConnectionRequest::new();
connection_request.addresses = addresses_info;
connection_request.cluster_mode_enabled = use_cluster_mode;
connection_request.tls_mode = if use_tls {
connection_request::TlsMode::InsecureTls
} else {
connection_request::TlsMode::NoTls
}
.into();

connection_request
}

fn create_connection_internal(
host: *const c_char,
port: u32,
use_tls: bool,
use_cluster_mode: bool,
success_callback: SuccessCallback,
failure_callback: FailureCallback,
) -> RedisResult<Connection> {
let host_cstring = unsafe { CStr::from_ptr(host as *mut c_char) };
let host_string = host_cstring.to_str()?.to_string();
let request = create_connection_request(host_string, port, use_tls, use_cluster_mode);
let runtime = Builder::new_multi_thread()
.enable_all()
.thread_name("Babushka go thread")
.build()?;
let _runtime_handle = runtime.enter();
let connection = runtime.block_on(BabushkaClient::new(request)).unwrap();
Ok(Connection {
connection,
success_callback,
failure_callback,
runtime,
})
}

/// Creates a new connection to the given address. The success callback needs to copy the given string synchronously, since it will be dropped by Rust once the callback returns. All callbacks should be offloaded to separate threads in order not to exhaust the connection's thread pool.
#[no_mangle]
pub extern "C" fn create_connection(
host: *const c_char,
port: u32,
use_tls: bool,
use_cluster_mode: bool,
success_callback: SuccessCallback,
failure_callback: FailureCallback,
) -> *const c_void {
match create_connection_internal(host, port, use_tls, use_cluster_mode, success_callback, failure_callback) {
Err(_) => std::ptr::null(),
Ok(connection) => Box::into_raw(Box::new(connection)) as *const c_void,
}
}

#[no_mangle]
pub extern "C" fn close_connection(connection_ptr: *const c_void) {
let connection_ptr = unsafe { Box::from_raw(connection_ptr as *mut Connection) };
let _runtime_handle = connection_ptr.runtime.enter();
drop(connection_ptr);
}

/// Expects that key and value will be kept valid until the callback is called.
#[no_mangle]
pub extern "C" fn set(
connection_ptr: *const c_void,
key: *const c_char,
value: *const c_char,
channel: usize
) {
let connection = unsafe { Box::leak(Box::from_raw(connection_ptr as *mut Connection)) };
// The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed.
let ptr_address = connection_ptr as usize;

let key_cstring = unsafe { CStr::from_ptr(key as *mut c_char) };
let value_cstring = unsafe { CStr::from_ptr(value as *mut c_char) };
let mut connection_clone = connection.connection.clone();
connection.runtime.spawn(async move {
let key_bytes = key_cstring.to_bytes();
let value_bytes = value_cstring.to_bytes();
let mut cmd = Cmd::new();
cmd.arg("SET").arg(key_bytes).arg(value_bytes);
let result = connection_clone.req_packed_command(&cmd, None).await;
unsafe {
let client = Box::leak(Box::from_raw(ptr_address as *mut Connection));
match result {
Ok(_) => (client.success_callback)(std::ptr::null(), channel),
Err(_) => (client.failure_callback)(channel),
};
}
});
}

/// Expects that key will be kept valid until the callback is called. If the callback is called with a string pointer, the pointer must
/// be used synchronously, because the string will be dropped after the callback.
#[no_mangle]
pub extern "C" fn get(connection_ptr: *const c_void, key: *const c_char, channel: usize) {
let connection = unsafe { Box::leak(Box::from_raw(connection_ptr as *mut Connection)) };
// The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed.
let ptr_address = connection_ptr as usize;

let key_cstring = unsafe { CStr::from_ptr(key as *mut c_char) };
let mut connection_clone = connection.connection.clone();
connection.runtime.spawn(async move {
let key_bytes = key_cstring.to_bytes();
let mut cmd = Cmd::new();
cmd.arg("GET").arg(key_bytes);
let result = connection_clone.req_packed_command(&cmd, None).await;
let connection = unsafe { Box::leak(Box::from_raw(ptr_address as *mut Connection)) };
let value = match result {
Ok(value) => value,
Err(_) => {
unsafe { (connection.failure_callback)(channel) };
return;
}
};
let result = Option::<CString>::from_redis_value(&value);

unsafe {
match result {
Ok(None) => (connection.success_callback)(std::ptr::null(), channel),
Ok(Some(c_str)) => (connection.success_callback)(c_str.as_ptr(), channel),
Err(_) => (connection.failure_callback)(channel),
};
}
});
}

4 changes: 2 additions & 2 deletions go/benchmarks/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ func getKeysInSortedOrderForPrint(m map[string]latencyResults) []string {
}

func PrintResultsStdOut(benchmarkConfig *BenchmarkConfig, resultMap map[string]latencyResults, tps float64, resultsFile *os.File) {
writeFileOrPanic(resultsFile, fmt.Sprintf("Client Name: %s, Tasks Count: %d, Data Size: %d, Client Count: %d, TPS: %f\n",
benchmarkConfig.ClientName, benchmarkConfig.TasksCount, benchmarkConfig.DataSize, benchmarkConfig.ClientCount, tps))
writeFileOrPanic(resultsFile, fmt.Sprintf("Client Name: %s, Cluster Mode Enabled: %t, Tasks Count: %d, Data Size: %d, Client Count: %d, TPS: %f\n",
benchmarkConfig.ClientName, benchmarkConfig.IsCluster, benchmarkConfig.TasksCount, benchmarkConfig.DataSize, benchmarkConfig.ClientCount, tps))
keys := getKeysInSortedOrderForPrint(resultMap)
for _, key := range keys {
action := key
Expand Down
Loading

0 comments on commit fb4f2df

Please sign in to comment.