Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RM_Call options #290

Merged
merged 6 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion examples/call.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#[macro_use]
extern crate redis_module;

use redis_module::{Context, RedisError, RedisResult, RedisString};
use redis_module::raw::*;
use redis_module::{
CallOptionsBuilder, CallReply, Context, RedisError, RedisResult, RedisString, RootCallReply,
};

fn call_test(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
let res: String = ctx.call("ECHO", &["TEST"])?.try_into()?;
Expand Down Expand Up @@ -62,6 +65,18 @@ fn call_test(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
));
}

let call_options = CallOptionsBuilder::new().script_mode().errors_as_replies();
let res: RootCallReply = ctx.call_ext::<&[&str; 0], _>("SHUTDOWN", &call_options.build(), &[]);
if res.get_type() != ReplyType::Error {
return Err(RedisError::Str("Failed to set script mode on call_ext"));
}
let error_msg = res.get_string().unwrap();
if !error_msg.contains("not allow") {
return Err(RedisError::String(format!(
"Failed to verify error messages, expected error message to contain 'not allow', error message: '{error_msg}'",
)));
}

Ok("pass".into())
}

Expand Down
176 changes: 176 additions & 0 deletions src/context/call_reply.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::{ffi::c_longlong, ptr::NonNull};

use crate::raw::*;

pub trait CallReply {
/// Return the call reply type
fn get_type(&self) -> ReplyType;

/// Return the reply us rust string,
/// Only relevant to the following types:
/// * String
/// * Error
///
/// A none will also be returned if failed to convert the
/// data into a string (data is binary).
fn get_string(&self) -> Option<String>;

/// Return lenght of the reply,
/// Only relevant to the following types:
/// * String
/// * Error
/// * Array
///
/// Running this function on other type will return 0.
fn len(&self) -> usize;

/// Return the reply at the location of the given index,
/// Only relevant to the following types:
/// * Array
///
/// Running this function on other type will return None.
fn get(&self, index: usize) -> Option<InnerCallReply>;

/// Return an iterator over the elements in the array
/// Only relevant to the following types:
/// * Array
///
/// Running this function on other type will return an empty iterator.
fn iter(&self) -> Box<dyn Iterator<Item = InnerCallReply> + '_>;
iddm marked this conversation as resolved.
Show resolved Hide resolved

/// Return integer value of the reply type
/// Only relevant to the following types:
/// * Integer
///
/// Running this function on other type will return 0.
fn get_int(&self) -> c_longlong;
}

pub struct RootCallReply {
reply: Option<NonNull<RedisModuleCallReply>>,
}

impl RootCallReply {
pub(crate) fn new(reply: *mut RedisModuleCallReply) -> RootCallReply {
RootCallReply {
reply: NonNull::new(reply),
}
}
}

impl CallReply for RootCallReply {
fn get_type(&self) -> ReplyType {
self.reply
.map_or(ReplyType::Unknown, |e| call_reply_type(e.as_ptr()))
}

fn get_string(&self) -> Option<String> {
call_reply_string(self.reply?.as_ptr())
}

fn len(&self) -> usize {
self.reply.map_or(0, |e| call_reply_length(e.as_ptr()))
}

fn get(&self, index: usize) -> Option<InnerCallReply> {
// Redis will verify array boundaries so no need to veirfy it here.
NonNull::new(call_reply_array_element(self.reply?.as_ptr(), index))
.map(|inner_reply| InnerCallReply::new(self, inner_reply))
}

fn iter(&self) -> Box<dyn Iterator<Item = InnerCallReply> + '_> {
Box::new(RootCallReplyIterator {
reply: self,
index: 0,
})
}

fn get_int(&self) -> c_longlong {
self.reply.map_or(0, |e| call_reply_integer(e.as_ptr()))
}
}

impl Drop for RootCallReply {
fn drop(&mut self) {
self.reply.map(|e| free_call_reply(e.as_ptr()));
}
}

pub struct RootCallReplyIterator<'root> {
reply: &'root RootCallReply,
index: usize,
}

impl<'root> Iterator for RootCallReplyIterator<'root> {
type Item = InnerCallReply<'root>;

fn next(&mut self) -> Option<Self::Item> {
let res = self.reply.get(self.index);
if res.is_some() {
self.index += 1;
}
res
}
}

pub struct InnerCallReply<'root> {
root: &'root RootCallReply,
reply: NonNull<RedisModuleCallReply>,
}

impl<'root> InnerCallReply<'root> {
pub(crate) fn new(
root: &'root RootCallReply,
reply: NonNull<RedisModuleCallReply>,
) -> InnerCallReply<'root> {
InnerCallReply { root, reply }
}
}

impl<'a> CallReply for InnerCallReply<'a> {
fn get_type(&self) -> ReplyType {
call_reply_type(self.reply.as_ptr())
}

fn get_string(&self) -> Option<String> {
call_reply_string(self.reply.as_ptr())
}

fn len(&self) -> usize {
call_reply_length(self.reply.as_ptr())
}

fn get(&self, index: usize) -> Option<Self> {
// Redis will verify array boundaries so no need to veirfy it here.
NonNull::new(call_reply_array_element(self.reply.as_ptr(), index))
.map(|inner_reply| Self::new(self.root, inner_reply))
}

fn iter(&self) -> Box<dyn Iterator<Item = InnerCallReply> + '_> {
Box::new(InnerCallReplyIterator {
reply: self,
index: 0,
})
}

fn get_int(&self) -> c_longlong {
call_reply_integer(self.reply.as_ptr())
}
}

pub struct InnerCallReplyIterator<'root, 'curr: 'root> {
reply: &'curr InnerCallReply<'root>,
index: usize,
}

impl<'root, 'curr: 'root> Iterator for InnerCallReplyIterator<'root, 'curr> {
type Item = InnerCallReply<'root>;

fn next(&mut self) -> Option<Self::Item> {
let res = self.reply.get(self.index);
if res.is_some() {
self.index += 1;
}
res
}
}
142 changes: 115 additions & 27 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::{RedisError, RedisResult, RedisString, RedisValue};
#[cfg(feature = "experimental-api")]
use std::ffi::CStr;

use self::call_reply::RootCallReply;

#[cfg(feature = "experimental-api")]
mod timer;

Expand All @@ -28,6 +30,92 @@ pub mod server_events;

pub mod keys_cursor;

pub mod call_reply;

pub struct CallOptionsBuilder {
options: String,
}

pub struct CallOptions {
options: CString,
}

pub enum CallOptionResp {
Resp2,
Resp3,
Auto,
}

impl CallOptionsBuilder {
pub fn new() -> CallOptionsBuilder {
CallOptionsBuilder {
options: "v".to_string(),
}
}

fn add_flag(&mut self, flag: &str) {
self.options.push_str(flag);
}

/// Enable this option will not allow RM_Call to perform write commands
pub fn no_writes(mut self) -> CallOptionsBuilder {
self.add_flag("W");
self
}

/// Enable this option will run RM_Call is script mode.
/// This mean that Redis will enable the following protections:
/// 1. Not allow running dangerous commands like 'shutdown'
/// 2. Not allow running write commands on OOM or if there are not enough good replica's connected
pub fn script_mode(mut self) -> CallOptionsBuilder {
self.add_flag("S");
self
}

/// Enable this option will perform ACL validation on the user attached to the context that
/// is used to invoke the call.
pub fn verify_acl(mut self) -> CallOptionsBuilder {
self.add_flag("C");
self
}

/// Enable this option will OOM validation before running the command
pub fn verify_oom(mut self) -> CallOptionsBuilder {
self.add_flag("M");
self
}

/// Enable this option will return error as CallReply object instead of setting errno (it is
/// usually recommend to enable it)
pub fn errors_as_replies(mut self) -> CallOptionsBuilder {
self.add_flag("E");
self
}

/// Enable this option will cause the command to be replicaed to the replica and AOF
pub fn replicate(mut self) -> CallOptionsBuilder {
self.add_flag("!");
self
}

/// Allow control the protocol version in which the replies will be returned.
pub fn resp_3(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
match resp {
CallOptionResp::Auto => self.add_flag("0"),
CallOptionResp::Resp2 => (),
CallOptionResp::Resp3 => self.add_flag("3"),
}
self
}

/// Construct a CallOption object that can be used to run commands using call_ext
pub fn build(self) -> CallOptions {
CallOptions {
options: CString::new(self.options).unwrap(), // the data will never contains internal \0 so it is safe to unwrap.
}
}
}

/// `Context` is a structure that's designed to give us a high-level interface to
/// the Redis module API by abstracting away the raw C FFI calls.
pub struct Context {
Expand Down Expand Up @@ -156,7 +244,12 @@ impl Context {
}
}

pub fn call<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) -> RedisResult {
fn call_internal<'a, T: Into<StrCallArgs<'a>>, R: From<RootCallReply>>(
&self,
command: &str,
fmt: *const c_char,
args: T,
) -> R {
let mut call_args: StrCallArgs = args.into();
let final_args = call_args.args_mut();

Expand All @@ -166,36 +259,28 @@ impl Context {
p_call(
self.ctx,
cmd.as_ptr(),
raw::FMT,
fmt,
final_args.as_mut_ptr(),
final_args.len(),
)
};
let result = Self::parse_call_reply(reply);
if !reply.is_null() {
raw::free_call_reply(reply);
}
result
}

fn parse_call_reply(reply: *mut raw::RedisModuleCallReply) -> RedisResult {
match raw::call_reply_type(reply) {
raw::ReplyType::Error => Err(RedisError::String(raw::call_reply_string(reply))),
raw::ReplyType::Unknown => Err(RedisError::Str("Error on method call")),
raw::ReplyType::Array => {
let length = raw::call_reply_length(reply);
let mut vec = Vec::with_capacity(length);
for i in 0..length {
vec.push(Self::parse_call_reply(raw::call_reply_array_element(
reply, i,
))?);
}
Ok(RedisValue::Array(vec))
}
raw::ReplyType::Integer => Ok(RedisValue::Integer(raw::call_reply_integer(reply))),
raw::ReplyType::String => Ok(RedisValue::SimpleString(raw::call_reply_string(reply))),
raw::ReplyType::Null => Ok(RedisValue::Null),
}
R::from(RootCallReply::new(reply))
}

pub fn call<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) -> RedisResult {
self.call_internal::<_, RedisResult>(command, raw::FMT, args)
}

/// Invoke a command on Redis and return the result
/// Unlike 'call' this API also allow to pass a CallOption to control different aspects
/// of the command invocation.
pub fn call_ext<'a, T: Into<StrCallArgs<'a>>, R: From<RootCallReply>>(
&self,
command: &str,
options: &CallOptions,
args: T,
) -> R {
self.call_internal(command, options.options.as_ptr() as *const c_char, args)
iddm marked this conversation as resolved.
Show resolved Hide resolved
}

#[must_use]
Expand Down Expand Up @@ -289,6 +374,9 @@ impl Context {

Ok(RedisValue::NoReply) => raw::Status::Ok,

Ok(RedisValue::Error(s)) => self.reply_error_string(&s),
Ok(RedisValue::StaticError(s)) => self.reply_error_string(s),

Err(RedisError::WrongArity) => unsafe {
if self.is_keys_position_request() {
// We can't return a result since we don't have a client
Expand Down
Loading