diff --git a/go/api/base_client.go b/go/api/base_client.go index 6086a8f880..cc447f0d7b 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -104,14 +104,18 @@ func (client *baseClient) Close() { client.coreClient = nil } -func (client *baseClient) executeCommand( +func (client *baseClient) executeCommand(requestType C.RequestType, args []string) (*C.struct_CommandResponse, error) { + return client.executeCommandWithRoute(requestType, args, nil) +} + +func (client *baseClient) executeCommandWithRoute( requestType C.RequestType, args []string, + route route, ) (*C.struct_CommandResponse, error) { if client.coreClient == nil { return nil, &ClosingError{"ExecuteCommand failed. The client is closed."} } - var cArgsPtr *C.uintptr_t = nil var argLengthsPtr *C.ulong = nil if len(args) > 0 { @@ -123,6 +127,22 @@ func (client *baseClient) executeCommand( resultChannel := make(chan payload) resultChannelPtr := uintptr(unsafe.Pointer(&resultChannel)) + var routeBytesPtr *C.uchar = nil + var routeBytesCount C.uintptr_t = 0 + if route != nil { + routeProto, err := route.toRoutesProtobuf() + if err != nil { + return nil, &RequestError{"ExecuteCommand failed due to invalid route"} + } + msg, err := proto.Marshal(routeProto) + if err != nil { + return nil, err + } + + routeBytesCount = C.uintptr_t(len(msg)) + routeBytesPtr = (*C.uchar)(C.CBytes(msg)) + } + C.command( client.coreClient, C.uintptr_t(resultChannelPtr), @@ -130,6 +150,8 @@ func (client *baseClient) executeCommand( C.size_t(len(args)), cArgsPtr, argLengthsPtr, + routeBytesPtr, + routeBytesCount, ) payload := <-resultChannel if payload.error != nil { diff --git a/go/api/request_routing_config.go b/go/api/request_routing_config.go new file mode 100644 index 0000000000..3a1c53b124 --- /dev/null +++ b/go/api/request_routing_config.go @@ -0,0 +1,191 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package api + +import ( + "fmt" + "strconv" + "strings" + + "github.com/valkey-io/valkey-glide/go/glide/protobuf" +) + +// Request routing basic interface. Please use one of the following: +// - [api.SimpleNodeRoute] +// - [api.SlotIdRoute] +// - [api.SlotKeyRoute] +// - [api.ByAddressRoute] +type route interface { + toRoutesProtobuf() (*protobuf.Routes, error) +} + +type SimpleNodeRoute int + +const ( + // Route request to all nodes. + // Warning: Don't use it with write commands, they could be routed to a replica (RO) node and fail. + AllNodes SimpleNodeRoute = iota + // Route request to all primary nodes. + AllPrimaries + // Route request to a random node. + // Warning: Don't use it with write commands, because they could be randomly routed to a replica (RO) node and fail. + RandomRoute +) + +func (simpleNodeRoute SimpleNodeRoute) toRoutesProtobuf() (*protobuf.Routes, error) { + simpleRouteProto, err := mapSimpleNodeRoute(simpleNodeRoute) + if err != nil { + return nil, err + } + + request := &protobuf.Routes{ + Value: &protobuf.Routes_SimpleRoutes{ + SimpleRoutes: simpleRouteProto, + }, + } + return request, nil +} + +func mapSimpleNodeRoute(simpleNodeRoute SimpleNodeRoute) (protobuf.SimpleRoutes, error) { + switch simpleNodeRoute { + case AllNodes: + return protobuf.SimpleRoutes_AllNodes, nil + case AllPrimaries: + return protobuf.SimpleRoutes_AllPrimaries, nil + case RandomRoute: + return protobuf.SimpleRoutes_Random, nil + default: + return protobuf.SimpleRoutes_Random, &RequestError{"Invalid simple node route"} + } +} + +// Defines type of the node being addressed. +type SlotType int + +const ( + // Address a primary node. + SlotTypePrimary SlotType = iota + // Address a replica node. + SlotTypeReplica +) + +func mapSlotType(slotType SlotType) (protobuf.SlotTypes, error) { + switch slotType { + case SlotTypePrimary: + return protobuf.SlotTypes_Primary, nil + case SlotTypeReplica: + return protobuf.SlotTypes_Replica, nil + default: + return protobuf.SlotTypes_Primary, &RequestError{"Invalid slot type"} + } +} + +// Request routing configuration overrides the [api.ReadFrom] connection configuration. +// If SlotTypeReplica is used, the request will be routed to a replica, even if the strategy is ReadFrom [api.PreferReplica]. +type SlotIdRoute struct { + slotType SlotType + slotID int32 +} + +// - slotType: Defines type of the node being addressed. +// - slotId: Slot number. There are 16384 slots in a Valkey cluster, and each shard manages a slot range. Unless the slot is +// known, it's better to route using [api.SlotTypePrimary]. +func NewSlotIdRoute(slotType SlotType, slotId int32) *SlotIdRoute { + return &SlotIdRoute{slotType: slotType, slotID: slotId} +} + +func (slotIdRoute *SlotIdRoute) toRoutesProtobuf() (*protobuf.Routes, error) { + slotType, err := mapSlotType(slotIdRoute.slotType) + if err != nil { + return nil, err + } + + request := &protobuf.Routes{ + Value: &protobuf.Routes_SlotIdRoute{ + SlotIdRoute: &protobuf.SlotIdRoute{ + SlotType: slotType, + SlotId: slotIdRoute.slotID, + }, + }, + } + return request, nil +} + +// Request routing configuration overrides the [api.ReadFrom] connection configuration. +// If SlotTypeReplica is used, the request will be routed to a replica, even if the strategy is ReadFrom [api.PreferReplica]. +type SlotKeyRoute struct { + slotType SlotType + slotKey string +} + +// - slotType: Defines type of the node being addressed. +// - slotKey: The request will be sent to nodes managing this key. +func NewSlotKeyRoute(slotType SlotType, slotKey string) *SlotKeyRoute { + return &SlotKeyRoute{slotType: slotType, slotKey: slotKey} +} + +func (slotKeyRoute *SlotKeyRoute) toRoutesProtobuf() (*protobuf.Routes, error) { + slotType, err := mapSlotType(slotKeyRoute.slotType) + if err != nil { + return nil, err + } + + request := &protobuf.Routes{ + Value: &protobuf.Routes_SlotKeyRoute{ + SlotKeyRoute: &protobuf.SlotKeyRoute{ + SlotType: slotType, + SlotKey: slotKeyRoute.slotKey, + }, + }, + } + return request, nil +} + +// Routes a request to a node by its address. +type ByAddressRoute struct { + host string + port int32 +} + +// Create a route using hostname/address and port. +// - host: The endpoint of the node. If port is not provided, host should be in the "address:port" format, where address is the +// preferred endpoint as shown in the output of the CLUSTER SLOTS command. +// - port: The port to access the node. If port is not provided, host is assumed to be in the format "address:port". +func NewByAddressRoute(host string, port int32) *ByAddressRoute { + return &ByAddressRoute{host: host, port: port} +} + +// Create a route using address string formatted as "address:port". +func NewByAddressRouteWithHost(host string) (*ByAddressRoute, error) { + split := strings.Split(host, ":") + if len(split) != 2 { + return nil, &RequestError{ + fmt.Sprintf( + "no port provided, or host is not in the expected format 'hostname:port'. Received: %s", host, + ), + } + } + + port, err := strconv.ParseInt(split[1], 10, 32) + if err != nil { + return nil, &RequestError{ + fmt.Sprintf( + "port must be a valid integer. Received: %s", split[1], + ), + } + } + + return &ByAddressRoute{host: split[0], port: int32(port)}, nil +} + +func (byAddressRoute *ByAddressRoute) toRoutesProtobuf() (*protobuf.Routes, error) { + request := &protobuf.Routes{ + Value: &protobuf.Routes_ByAddressRoute{ + ByAddressRoute: &protobuf.ByAddressRoute{ + Host: byAddressRoute.host, + Port: byAddressRoute.port, + }, + }, + } + return request, nil +} diff --git a/go/api/request_routing_config_test.go b/go/api/request_routing_config_test.go new file mode 100644 index 0000000000..f16b6cfcb7 --- /dev/null +++ b/go/api/request_routing_config_test.go @@ -0,0 +1,96 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package api + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/valkey-io/valkey-glide/go/glide/protobuf" +) + +func TestSimpleNodeRoute(t *testing.T) { + config := AllNodes + expected := &protobuf.Routes{ + Value: &protobuf.Routes_SimpleRoutes{ + SimpleRoutes: protobuf.SimpleRoutes_AllNodes, + }, + } + + result, err := config.toRoutesProtobuf() + + assert.Equal(t, expected, result) + assert.Nil(t, err) +} + +func TestSlotIdRoute(t *testing.T) { + config := NewSlotIdRoute(SlotTypePrimary, int32(100)) + expected := &protobuf.Routes{ + Value: &protobuf.Routes_SlotIdRoute{ + SlotIdRoute: &protobuf.SlotIdRoute{ + SlotType: protobuf.SlotTypes_Primary, + SlotId: 100, + }, + }, + } + + result, err := config.toRoutesProtobuf() + + assert.Equal(t, expected, result) + assert.Nil(t, err) +} + +func TestSlotKeyRoute(t *testing.T) { + config := NewSlotKeyRoute(SlotTypePrimary, "Slot1") + expected := &protobuf.Routes{ + Value: &protobuf.Routes_SlotKeyRoute{ + SlotKeyRoute: &protobuf.SlotKeyRoute{ + SlotType: protobuf.SlotTypes_Primary, + SlotKey: "Slot1", + }, + }, + } + + result, err := config.toRoutesProtobuf() + + assert.Equal(t, expected, result) + assert.Nil(t, err) +} + +func TestByAddressRoute(t *testing.T) { + config := NewByAddressRoute("localhost", int32(6739)) + expected := &protobuf.Routes{ + Value: &protobuf.Routes_ByAddressRoute{ + ByAddressRoute: &protobuf.ByAddressRoute{Host: "localhost", Port: 6739}, + }, + } + + result, err := config.toRoutesProtobuf() + + assert.Equal(t, expected, result) + assert.Nil(t, err) +} + +func TestByAddressRouteWithHost(t *testing.T) { + config, _ := NewByAddressRouteWithHost("localhost:6739") + expected := &protobuf.Routes{ + Value: &protobuf.Routes_ByAddressRoute{ + ByAddressRoute: &protobuf.ByAddressRoute{Host: "localhost", Port: 6739}, + }, + } + + result, err := config.toRoutesProtobuf() + + assert.Equal(t, expected, result) + assert.Nil(t, err) +} + +func TestByAddressRoute_MultiplePorts(t *testing.T) { + _, err := NewByAddressRouteWithHost("localhost:6739:6740") + assert.NotNil(t, err) +} + +func TestByAddressRoute_InvalidHost(t *testing.T) { + _, err := NewByAddressRouteWithHost("localhost") + assert.NotNil(t, err) +} diff --git a/go/src/lib.rs b/go/src/lib.rs index 361bf320f6..376da58dfa 100644 --- a/go/src/lib.rs +++ b/go/src/lib.rs @@ -2,13 +2,19 @@ #![deny(unsafe_op_in_unsafe_fn)] use glide_core::client::Client as GlideClient; +use glide_core::command_request::SimpleRoutes; +use glide_core::command_request::{Routes, SlotTypes}; use glide_core::connection_request; use glide_core::errors; use glide_core::errors::RequestErrorType; use glide_core::request_type::RequestType; use glide_core::ConnectionRequest; use protobuf::Message; -use redis::{RedisResult, Value}; +use redis::cluster_routing::{ + MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, +}; +use redis::cluster_routing::{ResponsePolicy, Routable}; +use redis::{Cmd, RedisResult, Value}; use std::slice::from_raw_parts; use std::{ ffi::{c_void, CString}, @@ -515,6 +521,8 @@ pub unsafe extern "C" fn command( arg_count: c_ulong, args: *const usize, args_len: *const c_ulong, + route_bytes: *const u8, + route_bytes_len: usize, ) { let client_adapter = unsafe { Box::leak(Box::from_raw(client_adapter_ptr as *mut ClientAdapter)) }; @@ -536,8 +544,14 @@ pub unsafe extern "C" fn command( cmd.arg(command_arg); } + let r_bytes = unsafe { std::slice::from_raw_parts(route_bytes, route_bytes_len) }; + + let route = Routes::parse_from_bytes(r_bytes).unwrap(); + client_adapter.runtime.spawn(async move { - let result = client_clone.send_command(&cmd, None).await; + let result = client_clone + .send_command(&cmd, get_route(route, Some(&cmd))) + .await; let client_adapter = unsafe { Box::leak(Box::from_raw(ptr_address as *mut ClientAdapter)) }; let value = match result { Ok(value) => value, @@ -573,3 +587,65 @@ pub unsafe extern "C" fn command( } }); } + +fn get_route(route: Routes, cmd: Option<&Cmd>) -> Option { + use glide_core::command_request::routes::Value; + let route = route.value?; + let get_response_policy = |cmd: Option<&Cmd>| { + cmd.and_then(|cmd| { + cmd.command() + .and_then(|cmd| ResponsePolicy::for_command(&cmd)) + }) + }; + match route { + Value::SimpleRoutes(simple_route) => { + let simple_route = simple_route.enum_value().unwrap(); + match simple_route { + SimpleRoutes::AllNodes => Some(RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + get_response_policy(cmd), + ))), + SimpleRoutes::AllPrimaries => Some(RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + get_response_policy(cmd), + ))), + SimpleRoutes::Random => { + Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + } + } + } + Value::SlotKeyRoute(slot_key_route) => Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(Route::new( + redis::cluster_topology::get_slot(slot_key_route.slot_key.as_bytes()), + get_slot_addr(&slot_key_route.slot_type), + )), + )), + Value::SlotIdRoute(slot_id_route) => Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(Route::new( + slot_id_route.slot_id as u16, + get_slot_addr(&slot_id_route.slot_type), + )), + )), + Value::ByAddressRoute(by_address_route) => match u16::try_from(by_address_route.port) { + Ok(port) => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { + host: by_address_route.host.to_string(), + port, + })), + Err(_) => { + // TODO: Handle error propagation. + None + } + }, + _ => panic!("unknown route type"), + } +} + +fn get_slot_addr(slot_type: &protobuf::EnumOrUnknown) -> SlotAddr { + slot_type + .enum_value() + .map(|slot_type| match slot_type { + SlotTypes::Primary => SlotAddr::Master, + SlotTypes::Replica => SlotAddr::ReplicaRequired, + }) + .expect("Received unexpected slot id type") +}