Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify FFI calls with an example on C# #114

Draft
wants to merge 2 commits into
base: csharp/integ_yuryf_rust
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions csharp/lib/AsyncClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ public AsyncClient(string host, UInt32 port, bool useTLS)
public async Task SetAsync(string key, string value)
{
var message = messageContainer.GetMessageForCall(key, value);
SetFfi(clientPointer, (ulong)message.Index, message.KeyPtr, message.ValuePtr);
Command(clientPointer, (ulong)message.Index, RequestType.SetString, 2UL, new IntPtr[] { message.KeyPtr, message.ValuePtr });
//SetFfi(clientPointer, (ulong)message.Index, );
await message;
}

public async Task<string?> GetAsync(string key)
{
var message = messageContainer.GetMessageForCall(key, null);
GetFfi(clientPointer, (ulong)message.Index, message.KeyPtr);
//GetFfi(clientPointer, (ulong)message.Index, message.KeyPtr);
Command(clientPointer, (ulong)message.Index, RequestType.GetString, 1UL, new IntPtr[] { message.KeyPtr });
return await message;
}

Expand All @@ -54,6 +56,7 @@ public void Dispose()
private void SuccessCallback(ulong index, IntPtr str)
{
var result = str == IntPtr.Zero ? null : Marshal.PtrToStringAnsi(str);
Console.WriteLine($" - SuccessCallback {result}");
// Work needs to be offloaded from the calling thread, because otherwise we might starve the client's thread pool.
Task.Run(() =>
{
Expand All @@ -64,6 +67,7 @@ private void SuccessCallback(ulong index, IntPtr str)

private void FailureCallback(ulong index)
{
Console.WriteLine(" - FailureCallback");
// Work needs to be offloaded from the calling thread, because otherwise we might starve the client's thread pool.
Task.Run(() =>
{
Expand Down Expand Up @@ -94,6 +98,16 @@ private void FailureCallback(ulong index)

#region FFI function declarations

public enum RequestType : uint

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove that and share the RequestType in the header file?
I'm not sure we need to convert from RequestType to RequestType to Cmd when it's 1:1:1.

Copy link
Author

@Yury-Fridlyand Yury-Fridlyand Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible. We can define the enum into a C header file and then import it to C# project and to rust (using bindgen).
We can also use protoc to do this for us and suppress a warning about not-FFI-safe enum.

{
// copied from redis_request.proto
CustomCommand = 1,
GetString = 2,
SetString = 3,
Ping = 4,
Info = 5,
}

private delegate void StringAction(ulong index, IntPtr str);
private delegate void FailureAction(ulong index);
[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "get")]
Expand All @@ -102,6 +116,9 @@ private void FailureCallback(ulong index)
[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "set")]
private static extern void SetFfi(IntPtr client, ulong index, IntPtr key, IntPtr value);

[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "command")]
private static extern void Command(IntPtr client, ulong index, RequestType requestType, ulong argCount, IntPtr[] args);

private delegate void IntAction(IntPtr arg);
[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "create_client")]
private static extern IntPtr CreateClientFfi(String host, UInt32 port, bool useTLS, IntPtr successCallback, IntPtr failureCallback);
Expand Down
253 changes: 252 additions & 1 deletion csharp/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
use glide_core::connection_request;
use glide_core::{client::Client as GlideClient, connection_request::NodeAddress};
use redis::{Cmd, FromRedisValue, RedisResult};
use redis::{Cmd, cmd, FromRedisValue, RedisResult, Value};
use std::{
ffi::{c_void, CStr, CString},
os::raw::c_char,
Expand Down Expand Up @@ -93,6 +93,257 @@ pub extern "C" fn close_client(client_ptr: *const c_void) {
drop(client_ptr);
}

// Cannot use glide_core::redis_request::RequestType, because it is not FFI safe
#[repr(u32)]
pub enum RequestType {
// copied from redis_request.proto
CustomCommand = 1,
GetString = 2,
SetString = 3,
Ping = 4,
Info = 5,
Del = 6,
Select = 7,
ConfigGet = 8,
ConfigSet = 9,
ConfigResetStat = 10,
ConfigRewrite = 11,
ClientGetName = 12,
ClientGetRedir = 13,
ClientId = 14,
ClientInfo = 15,
ClientKill = 16,
ClientList = 17,
ClientNoEvict = 18,
ClientNoTouch = 19,
ClientPause = 20,
ClientReply = 21,
ClientSetInfo = 22,
ClientSetName = 23,
ClientUnblock = 24,
ClientUnpause = 25,
Expire = 26,
HashSet = 27,
HashGet = 28,
HashDel = 29,
HashExists = 30,
MGet=31,
MSet=32,
Incr=33,
IncrBy=34,
Decr=35,
IncrByFloat=36,
DecrBy=37,
HashGetAll=38,
HashMSet=39,
HashMGet=40,
HashIncrBy = 41,
HashIncrByFloat = 42,
LPush = 43,
LPop = 44,
RPush = 45,
RPop = 46,
LLen = 47,
LRem = 48,
LRange = 49,
LTrim = 50,
SAdd = 51,
SRem = 52,
SMembers = 53,
SCard = 54,
PExpireAt = 55,
PExpire = 56,
ExpireAt = 57,
Exists = 58,
Unlink = 59,
TTL = 60,
Zadd = 61,
Zrem = 62,
Zrange = 63,
Zcard = 64,
Zcount = 65,
ZIncrBy = 66,
ZScore = 67,
Type = 68,
HLen = 69,
Echo = 70,
ZPopMin = 71,
Strlen = 72,
Lindex = 73,
ZPopMax = 74,
XRead = 75,
XAdd = 76,
XReadGroup = 77,
XAck = 78,
XTrim = 79,
XGroupCreate = 80,
XGroupDestroy = 81,
}

// copied from glide_core::socket_listener::get_command
fn get_command(request_type: RequestType) -> Option<Cmd> {
match request_type {
//RequestType::InvalidRequest => None,
RequestType::CustomCommand => Some(Cmd::new()),
RequestType::GetString => Some(cmd("GET")),
RequestType::SetString => Some(cmd("SET")),
RequestType::Ping => Some(cmd("PING")),
RequestType::Info => Some(cmd("INFO")),
RequestType::Del => Some(cmd("DEL")),
RequestType::Select => Some(cmd("SELECT")),
RequestType::ConfigGet => Some(get_two_word_command("CONFIG", "GET")),
RequestType::ConfigSet => Some(get_two_word_command("CONFIG", "SET")),
RequestType::ConfigResetStat => Some(get_two_word_command("CONFIG", "RESETSTAT")),
RequestType::ConfigRewrite => Some(get_two_word_command("CONFIG", "REWRITE")),
RequestType::ClientGetName => Some(get_two_word_command("CLIENT", "GETNAME")),
RequestType::ClientGetRedir => Some(get_two_word_command("CLIENT", "GETREDIR")),
RequestType::ClientId => Some(get_two_word_command("CLIENT", "ID")),
RequestType::ClientInfo => Some(get_two_word_command("CLIENT", "INFO")),
RequestType::ClientKill => Some(get_two_word_command("CLIENT", "KILL")),
RequestType::ClientList => Some(get_two_word_command("CLIENT", "LIST")),
RequestType::ClientNoEvict => Some(get_two_word_command("CLIENT", "NO-EVICT")),
RequestType::ClientNoTouch => Some(get_two_word_command("CLIENT", "NO-TOUCH")),
RequestType::ClientPause => Some(get_two_word_command("CLIENT", "PAUSE")),
RequestType::ClientReply => Some(get_two_word_command("CLIENT", "REPLY")),
RequestType::ClientSetInfo => Some(get_two_word_command("CLIENT", "SETINFO")),
RequestType::ClientSetName => Some(get_two_word_command("CLIENT", "SETNAME")),
RequestType::ClientUnblock => Some(get_two_word_command("CLIENT", "UNBLOCK")),
RequestType::ClientUnpause => Some(get_two_word_command("CLIENT", "UNPAUSE")),
RequestType::Expire => Some(cmd("EXPIRE")),
RequestType::HashSet => Some(cmd("HSET")),
RequestType::HashGet => Some(cmd("HGET")),
RequestType::HashDel => Some(cmd("HDEL")),
RequestType::HashExists => Some(cmd("HEXISTS")),
RequestType::MSet => Some(cmd("MSET")),
RequestType::MGet => Some(cmd("MGET")),
RequestType::Incr => Some(cmd("INCR")),
RequestType::IncrBy => Some(cmd("INCRBY")),
RequestType::IncrByFloat => Some(cmd("INCRBYFLOAT")),
RequestType::Decr => Some(cmd("DECR")),
RequestType::DecrBy => Some(cmd("DECRBY")),
RequestType::HashGetAll => Some(cmd("HGETALL")),
RequestType::HashMSet => Some(cmd("HMSET")),
RequestType::HashMGet => Some(cmd("HMGET")),
RequestType::HashIncrBy => Some(cmd("HINCRBY")),
RequestType::HashIncrByFloat => Some(cmd("HINCRBYFLOAT")),
RequestType::LPush => Some(cmd("LPUSH")),
RequestType::LPop => Some(cmd("LPOP")),
RequestType::RPush => Some(cmd("RPUSH")),
RequestType::RPop => Some(cmd("RPOP")),
RequestType::LLen => Some(cmd("LLEN")),
RequestType::LRem => Some(cmd("LREM")),
RequestType::LRange => Some(cmd("LRANGE")),
RequestType::LTrim => Some(cmd("LTRIM")),
RequestType::SAdd => Some(cmd("SADD")),
RequestType::SRem => Some(cmd("SREM")),
RequestType::SMembers => Some(cmd("SMEMBERS")),
RequestType::SCard => Some(cmd("SCARD")),
RequestType::PExpireAt => Some(cmd("PEXPIREAT")),
RequestType::PExpire => Some(cmd("PEXPIRE")),
RequestType::ExpireAt => Some(cmd("EXPIREAT")),
RequestType::Exists => Some(cmd("EXISTS")),
RequestType::Unlink => Some(cmd("UNLINK")),
RequestType::TTL => Some(cmd("TTL")),
RequestType::Zadd => Some(cmd("ZADD")),
RequestType::Zrem => Some(cmd("ZREM")),
RequestType::Zrange => Some(cmd("ZRANGE")),
RequestType::Zcard => Some(cmd("ZCARD")),
RequestType::Zcount => Some(cmd("ZCOUNT")),
RequestType::ZIncrBy => Some(cmd("ZINCRBY")),
RequestType::ZScore => Some(cmd("ZSCORE")),
RequestType::Type => Some(cmd("TYPE")),
RequestType::HLen => Some(cmd("HLEN")),
RequestType::Echo => Some(cmd("ECHO")),
RequestType::ZPopMin => Some(cmd("ZPOPMIN")),
RequestType::Strlen => Some(cmd("STRLEN")),
RequestType::Lindex => Some(cmd("LINDEX")),
RequestType::ZPopMax => Some(cmd("ZPOPMAX")),
RequestType::XAck => Some(cmd("XACK")),
RequestType::XAdd => Some(cmd("XADD")),
RequestType::XReadGroup => Some(cmd("XREADGROUP")),
RequestType::XRead => Some(cmd("XREAD")),
RequestType::XGroupCreate => Some(get_two_word_command("XGROUP", "CREATE")),
RequestType::XGroupDestroy => Some(get_two_word_command("XGROUP", "DESTROY")),
RequestType::XTrim => Some(cmd("XTRIM")),
}
}

// copied from glide_core::socket_listener::get_two_word_command
fn get_two_word_command(first: &str, second: &str) -> Cmd {
let mut cmd = cmd(first);
cmd.arg(second);
cmd
}

use std::slice::from_raw_parts;
use std::str::Utf8Error;

pub unsafe fn convert_double_pointer_to_vec(
data: *const *const c_char,
len: usize,
) -> Result<Vec<String>, Utf8Error> {
from_raw_parts(data, len)
.iter()
.map(|arg| CStr::from_ptr(*arg).to_str().map(ToString::to_string))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use CString

.collect()
}

#[no_mangle]
pub extern "C" fn command(
client_ptr: *const c_void,
callback_index: usize,
command_type: RequestType,
arg_count: usize,
args: *const *const c_char
) {
let client = unsafe { Box::leak(Box::from_raw(client_ptr as *mut Client)) };
// The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed.
let ptr_address = client_ptr as usize;

let arg_vec = unsafe { convert_double_pointer_to_vec(args, arg_count) }.unwrap(); // TODO check

let mut client_clone = client.client.clone();
client.runtime.spawn(async move {

let mut cmd = get_command(command_type).unwrap(); // TODO check cmd
//print!("{:?}", cmd.args);
cmd.arg(arg_vec);

let result = client_clone.send_command(&cmd, None).await;
let client = unsafe { Box::leak(Box::from_raw(ptr_address as *mut Client)) };
let value = match result {
Ok(value) => value,
Err(err) => {
print!(" === err {:?}\n", err);
unsafe { (client.failure_callback)(callback_index) }; // TODO - report errors,
return;
}
};

//print!(" === val {:?}\n", value.clone());

let result : RedisResult<Option<CString>> = match value {
Value::Nil => Ok(None),
Value::Int(num) => Ok(Some(CString::new(format!("{}", num)).unwrap())),
Value::SimpleString(_) | Value::BulkString(_) => Option::<CString>::from_owned_redis_value(value),
Value::Okay => Ok(Some(CString::new("OK").unwrap())),
Value::Double(num) => Ok(Some(CString::new(format!("{}", num)).unwrap())),
Value::Boolean(bool) => Ok(Some(CString::new(format!("{}", bool)).unwrap())),
_ => todo!()
};

//print!(" === result2 {:?}\n", result);

unsafe {
match result {
Ok(None) => (client.success_callback)(callback_index, std::ptr::null()),
Ok(Some(c_str)) => (client.success_callback)(callback_index, c_str.as_ptr()),
Err(_) => (client.failure_callback)(callback_index), // TODO - report errors
};
}
});
}

/// Expects that key and value will be kept valid until the callback is called.
#[no_mangle]
pub extern "C" fn set(
Expand Down
2 changes: 1 addition & 1 deletion csharp/tests/AsyncClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class AsyncClientTests
[OneTimeSetUp]
public void Setup()
{
Glide.Logger.SetLoggerConfig(Glide.Level.Info);
//Glide.Logger.SetLoggerConfig(Glide.Level.Info);
}

private async Task GetAndSetRandomValues(AsyncClient client)
Expand Down
Loading