diff --git a/csharp/lib/AsyncClient.cs b/csharp/lib/AsyncClient.cs index 83e3d4c39b..bc3ac15e71 100644 --- a/csharp/lib/AsyncClient.cs +++ b/csharp/lib/AsyncClient.cs @@ -25,14 +25,16 @@ public AsyncClient(string host, UInt32 port, bool useTLS) public async Task SetAsync(string key, string value) { var message = messageContainer.GetMessageForCall(key, value); - SetFfi(clientPointer, (ulong)message.Index, message.KeyPtr, message.ValuePtr); + Command(clientPointer, (ulong)message.Index, RequestType.SetString, 2UL, new IntPtr[] { message.KeyPtr, message.ValuePtr }); + //SetFfi(clientPointer, (ulong)message.Index, ); await message; } public async Task GetAsync(string key) { var message = messageContainer.GetMessageForCall(key, null); - GetFfi(clientPointer, (ulong)message.Index, message.KeyPtr); + //GetFfi(clientPointer, (ulong)message.Index, message.KeyPtr); + Command(clientPointer, (ulong)message.Index, RequestType.GetString, 1UL, new IntPtr[] { message.KeyPtr }); return await message; } @@ -54,6 +56,7 @@ public void Dispose() private void SuccessCallback(ulong index, IntPtr str) { var result = str == IntPtr.Zero ? null : Marshal.PtrToStringAnsi(str); + Console.WriteLine($" - SuccessCallback {result}"); // Work needs to be offloaded from the calling thread, because otherwise we might starve the client's thread pool. Task.Run(() => { @@ -64,6 +67,7 @@ private void SuccessCallback(ulong index, IntPtr str) private void FailureCallback(ulong index) { + Console.WriteLine(" - FailureCallback"); // Work needs to be offloaded from the calling thread, because otherwise we might starve the client's thread pool. Task.Run(() => { @@ -94,6 +98,16 @@ private void FailureCallback(ulong index) #region FFI function declarations + public enum RequestType : uint + { + // copied from redis_request.proto + CustomCommand = 1, + GetString = 2, + SetString = 3, + Ping = 4, + Info = 5, + } + private delegate void StringAction(ulong index, IntPtr str); private delegate void FailureAction(ulong index); [DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "get")] @@ -102,6 +116,9 @@ private void FailureCallback(ulong index) [DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "set")] private static extern void SetFfi(IntPtr client, ulong index, IntPtr key, IntPtr value); + [DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "command")] + private static extern void Command(IntPtr client, ulong index, RequestType requestType, ulong argCount, IntPtr[] args); + private delegate void IntAction(IntPtr arg); [DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "create_client")] private static extern IntPtr CreateClientFfi(String host, UInt32 port, bool useTLS, IntPtr successCallback, IntPtr failureCallback); diff --git a/csharp/lib/src/lib.rs b/csharp/lib/src/lib.rs index 495d959598..f0a29c6c1e 100644 --- a/csharp/lib/src/lib.rs +++ b/csharp/lib/src/lib.rs @@ -3,7 +3,7 @@ */ use glide_core::connection_request; use glide_core::{client::Client as GlideClient, connection_request::NodeAddress}; -use redis::{Cmd, FromRedisValue, RedisResult}; +use redis::{Cmd, cmd, FromRedisValue, RedisResult, Value}; use std::{ ffi::{c_void, CStr, CString}, os::raw::c_char, @@ -93,6 +93,257 @@ pub extern "C" fn close_client(client_ptr: *const c_void) { drop(client_ptr); } +// Cannot use glide_core::redis_request::RequestType, because it is not FFI safe +#[repr(u32)] +pub enum RequestType { + // copied from redis_request.proto + CustomCommand = 1, + GetString = 2, + SetString = 3, + Ping = 4, + Info = 5, + Del = 6, + Select = 7, + ConfigGet = 8, + ConfigSet = 9, + ConfigResetStat = 10, + ConfigRewrite = 11, + ClientGetName = 12, + ClientGetRedir = 13, + ClientId = 14, + ClientInfo = 15, + ClientKill = 16, + ClientList = 17, + ClientNoEvict = 18, + ClientNoTouch = 19, + ClientPause = 20, + ClientReply = 21, + ClientSetInfo = 22, + ClientSetName = 23, + ClientUnblock = 24, + ClientUnpause = 25, + Expire = 26, + HashSet = 27, + HashGet = 28, + HashDel = 29, + HashExists = 30, + MGet=31, + MSet=32, + Incr=33, + IncrBy=34, + Decr=35, + IncrByFloat=36, + DecrBy=37, + HashGetAll=38, + HashMSet=39, + HashMGet=40, + HashIncrBy = 41, + HashIncrByFloat = 42, + LPush = 43, + LPop = 44, + RPush = 45, + RPop = 46, + LLen = 47, + LRem = 48, + LRange = 49, + LTrim = 50, + SAdd = 51, + SRem = 52, + SMembers = 53, + SCard = 54, + PExpireAt = 55, + PExpire = 56, + ExpireAt = 57, + Exists = 58, + Unlink = 59, + TTL = 60, + Zadd = 61, + Zrem = 62, + Zrange = 63, + Zcard = 64, + Zcount = 65, + ZIncrBy = 66, + ZScore = 67, + Type = 68, + HLen = 69, + Echo = 70, + ZPopMin = 71, + Strlen = 72, + Lindex = 73, + ZPopMax = 74, + XRead = 75, + XAdd = 76, + XReadGroup = 77, + XAck = 78, + XTrim = 79, + XGroupCreate = 80, + XGroupDestroy = 81, +} + +// copied from glide_core::socket_listener::get_command +fn get_command(request_type: RequestType) -> Option { + match request_type { + //RequestType::InvalidRequest => None, + RequestType::CustomCommand => Some(Cmd::new()), + RequestType::GetString => Some(cmd("GET")), + RequestType::SetString => Some(cmd("SET")), + RequestType::Ping => Some(cmd("PING")), + RequestType::Info => Some(cmd("INFO")), + RequestType::Del => Some(cmd("DEL")), + RequestType::Select => Some(cmd("SELECT")), + RequestType::ConfigGet => Some(get_two_word_command("CONFIG", "GET")), + RequestType::ConfigSet => Some(get_two_word_command("CONFIG", "SET")), + RequestType::ConfigResetStat => Some(get_two_word_command("CONFIG", "RESETSTAT")), + RequestType::ConfigRewrite => Some(get_two_word_command("CONFIG", "REWRITE")), + RequestType::ClientGetName => Some(get_two_word_command("CLIENT", "GETNAME")), + RequestType::ClientGetRedir => Some(get_two_word_command("CLIENT", "GETREDIR")), + RequestType::ClientId => Some(get_two_word_command("CLIENT", "ID")), + RequestType::ClientInfo => Some(get_two_word_command("CLIENT", "INFO")), + RequestType::ClientKill => Some(get_two_word_command("CLIENT", "KILL")), + RequestType::ClientList => Some(get_two_word_command("CLIENT", "LIST")), + RequestType::ClientNoEvict => Some(get_two_word_command("CLIENT", "NO-EVICT")), + RequestType::ClientNoTouch => Some(get_two_word_command("CLIENT", "NO-TOUCH")), + RequestType::ClientPause => Some(get_two_word_command("CLIENT", "PAUSE")), + RequestType::ClientReply => Some(get_two_word_command("CLIENT", "REPLY")), + RequestType::ClientSetInfo => Some(get_two_word_command("CLIENT", "SETINFO")), + RequestType::ClientSetName => Some(get_two_word_command("CLIENT", "SETNAME")), + RequestType::ClientUnblock => Some(get_two_word_command("CLIENT", "UNBLOCK")), + RequestType::ClientUnpause => Some(get_two_word_command("CLIENT", "UNPAUSE")), + RequestType::Expire => Some(cmd("EXPIRE")), + RequestType::HashSet => Some(cmd("HSET")), + RequestType::HashGet => Some(cmd("HGET")), + RequestType::HashDel => Some(cmd("HDEL")), + RequestType::HashExists => Some(cmd("HEXISTS")), + RequestType::MSet => Some(cmd("MSET")), + RequestType::MGet => Some(cmd("MGET")), + RequestType::Incr => Some(cmd("INCR")), + RequestType::IncrBy => Some(cmd("INCRBY")), + RequestType::IncrByFloat => Some(cmd("INCRBYFLOAT")), + RequestType::Decr => Some(cmd("DECR")), + RequestType::DecrBy => Some(cmd("DECRBY")), + RequestType::HashGetAll => Some(cmd("HGETALL")), + RequestType::HashMSet => Some(cmd("HMSET")), + RequestType::HashMGet => Some(cmd("HMGET")), + RequestType::HashIncrBy => Some(cmd("HINCRBY")), + RequestType::HashIncrByFloat => Some(cmd("HINCRBYFLOAT")), + RequestType::LPush => Some(cmd("LPUSH")), + RequestType::LPop => Some(cmd("LPOP")), + RequestType::RPush => Some(cmd("RPUSH")), + RequestType::RPop => Some(cmd("RPOP")), + RequestType::LLen => Some(cmd("LLEN")), + RequestType::LRem => Some(cmd("LREM")), + RequestType::LRange => Some(cmd("LRANGE")), + RequestType::LTrim => Some(cmd("LTRIM")), + RequestType::SAdd => Some(cmd("SADD")), + RequestType::SRem => Some(cmd("SREM")), + RequestType::SMembers => Some(cmd("SMEMBERS")), + RequestType::SCard => Some(cmd("SCARD")), + RequestType::PExpireAt => Some(cmd("PEXPIREAT")), + RequestType::PExpire => Some(cmd("PEXPIRE")), + RequestType::ExpireAt => Some(cmd("EXPIREAT")), + RequestType::Exists => Some(cmd("EXISTS")), + RequestType::Unlink => Some(cmd("UNLINK")), + RequestType::TTL => Some(cmd("TTL")), + RequestType::Zadd => Some(cmd("ZADD")), + RequestType::Zrem => Some(cmd("ZREM")), + RequestType::Zrange => Some(cmd("ZRANGE")), + RequestType::Zcard => Some(cmd("ZCARD")), + RequestType::Zcount => Some(cmd("ZCOUNT")), + RequestType::ZIncrBy => Some(cmd("ZINCRBY")), + RequestType::ZScore => Some(cmd("ZSCORE")), + RequestType::Type => Some(cmd("TYPE")), + RequestType::HLen => Some(cmd("HLEN")), + RequestType::Echo => Some(cmd("ECHO")), + RequestType::ZPopMin => Some(cmd("ZPOPMIN")), + RequestType::Strlen => Some(cmd("STRLEN")), + RequestType::Lindex => Some(cmd("LINDEX")), + RequestType::ZPopMax => Some(cmd("ZPOPMAX")), + RequestType::XAck => Some(cmd("XACK")), + RequestType::XAdd => Some(cmd("XADD")), + RequestType::XReadGroup => Some(cmd("XREADGROUP")), + RequestType::XRead => Some(cmd("XREAD")), + RequestType::XGroupCreate => Some(get_two_word_command("XGROUP", "CREATE")), + RequestType::XGroupDestroy => Some(get_two_word_command("XGROUP", "DESTROY")), + RequestType::XTrim => Some(cmd("XTRIM")), + _ => None, + } +} + +// copied from glide_core::socket_listener::get_two_word_command +fn get_two_word_command(first: &str, second: &str) -> Cmd { + let mut cmd = cmd(first); + cmd.arg(second); + cmd +} + +use std::slice::from_raw_parts; +use std::str::Utf8Error; + +pub unsafe fn convert_double_pointer_to_vec( + data: *const *const c_char, + len: usize, +) -> Result, Utf8Error> { + from_raw_parts(data, len) + .iter() + .map(|arg| CStr::from_ptr(*arg).to_str().map(ToString::to_string)) + .collect() +} + +#[no_mangle] +pub extern "C" fn command( + client_ptr: *const c_void, + callback_index: usize, + command_type: RequestType, + arg_count: usize, + args: *const *const c_char +) { + let client = unsafe { Box::leak(Box::from_raw(client_ptr as *mut Client)) }; + // The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed. + let ptr_address = client_ptr as usize; + + let arg_vec = unsafe { convert_double_pointer_to_vec(args, arg_count) }.unwrap(); // TODO check + + let mut client_clone = client.client.clone(); + client.runtime.spawn(async move { + + let mut cmd = get_command(command_type).unwrap(); // TODO check cmd + cmd.arg(arg_vec); + + let result = client_clone.send_command(&cmd, None).await; + let client = unsafe { Box::leak(Box::from_raw(ptr_address as *mut Client)) }; + let value = match result { + Ok(value) => value, + Err(err) => { + print!(" === err {:?}\n", err); + unsafe { (client.failure_callback)(callback_index) }; // TODO - report errors, + return; + } + }; + + //print!(" === val {:?}\n", value.clone()); + + let result : RedisResult> = match value { + Value::Nil => Ok(None), + Value::Int(num) => Ok(Some(CString::new(format!("{}", num)).unwrap())), + Value::SimpleString(_) | Value::BulkString(_) => Option::::from_owned_redis_value(value), + Value::Okay => Ok(Some(CString::new("OK").unwrap())), + Value::Double(num) => Ok(Some(CString::new(format!("{}", num)).unwrap())), + Value::Boolean(bool) => Ok(Some(CString::new(format!("{}", bool)).unwrap())), + _ => todo!() + }; + + //print!(" === result2 {:?}\n", result); + + unsafe { + match result { + Ok(None) => (client.success_callback)(callback_index, std::ptr::null()), + Ok(Some(c_str)) => (client.success_callback)(callback_index, c_str.as_ptr()), + Err(_) => (client.failure_callback)(callback_index), // TODO - report errors + }; + } + }); +} + /// Expects that key and value will be kept valid until the callback is called. #[no_mangle] pub extern "C" fn set( diff --git a/csharp/tests/AsyncClientTests.cs b/csharp/tests/AsyncClientTests.cs index e9adfdf97b..9c554d3305 100644 --- a/csharp/tests/AsyncClientTests.cs +++ b/csharp/tests/AsyncClientTests.cs @@ -12,7 +12,7 @@ public class AsyncClientTests [OneTimeSetUp] public void Setup() { - Glide.Logger.SetLoggerConfig(Glide.Level.Info); + //Glide.Logger.SetLoggerConfig(Glide.Level.Info); } private async Task GetAndSetRandomValues(AsyncClient client)