Skip to content

Commit

Permalink
GO: Adding Route support (#2836)
Browse files Browse the repository at this point in the history
* GO: Adding Route support

Signed-off-by: Janhavi Gupta <janhavigupta@google.com>
  • Loading branch information
janhavigupta007 authored Dec 24, 2024
1 parent 64428f6 commit 2d78c84
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 4 deletions.
26 changes: 24 additions & 2 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,18 @@ func (client *baseClient) Close() {
client.coreClient = nil
}

func (client *baseClient) executeCommand(
func (client *baseClient) executeCommand(requestType C.RequestType, args []string) (*C.struct_CommandResponse, error) {
return client.executeCommandWithRoute(requestType, args, nil)
}

func (client *baseClient) executeCommandWithRoute(
requestType C.RequestType,
args []string,
route route,
) (*C.struct_CommandResponse, error) {
if client.coreClient == nil {
return nil, &ClosingError{"ExecuteCommand failed. The client is closed."}
}

var cArgsPtr *C.uintptr_t = nil
var argLengthsPtr *C.ulong = nil
if len(args) > 0 {
Expand All @@ -123,13 +127,31 @@ func (client *baseClient) executeCommand(
resultChannel := make(chan payload)
resultChannelPtr := uintptr(unsafe.Pointer(&resultChannel))

var routeBytesPtr *C.uchar = nil
var routeBytesCount C.uintptr_t = 0
if route != nil {
routeProto, err := route.toRoutesProtobuf()
if err != nil {
return nil, &RequestError{"ExecuteCommand failed due to invalid route"}
}
msg, err := proto.Marshal(routeProto)
if err != nil {
return nil, err
}

routeBytesCount = C.uintptr_t(len(msg))
routeBytesPtr = (*C.uchar)(C.CBytes(msg))
}

C.command(
client.coreClient,
C.uintptr_t(resultChannelPtr),
uint32(requestType),
C.size_t(len(args)),
cArgsPtr,
argLengthsPtr,
routeBytesPtr,
routeBytesCount,
)
payload := <-resultChannel
if payload.error != nil {
Expand Down
191 changes: 191 additions & 0 deletions go/api/request_routing_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

package api

import (
"fmt"
"strconv"
"strings"

"github.com/valkey-io/valkey-glide/go/glide/protobuf"
)

// Request routing basic interface. Please use one of the following:
// - [api.SimpleNodeRoute]
// - [api.SlotIdRoute]
// - [api.SlotKeyRoute]
// - [api.ByAddressRoute]
type route interface {
toRoutesProtobuf() (*protobuf.Routes, error)
}

type SimpleNodeRoute int

const (
// Route request to all nodes.
// Warning: Don't use it with write commands, they could be routed to a replica (RO) node and fail.
AllNodes SimpleNodeRoute = iota
// Route request to all primary nodes.
AllPrimaries
// Route request to a random node.
// Warning: Don't use it with write commands, because they could be randomly routed to a replica (RO) node and fail.
RandomRoute
)

func (simpleNodeRoute SimpleNodeRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
simpleRouteProto, err := mapSimpleNodeRoute(simpleNodeRoute)
if err != nil {
return nil, err
}

request := &protobuf.Routes{
Value: &protobuf.Routes_SimpleRoutes{
SimpleRoutes: simpleRouteProto,
},
}
return request, nil
}

func mapSimpleNodeRoute(simpleNodeRoute SimpleNodeRoute) (protobuf.SimpleRoutes, error) {
switch simpleNodeRoute {
case AllNodes:
return protobuf.SimpleRoutes_AllNodes, nil
case AllPrimaries:
return protobuf.SimpleRoutes_AllPrimaries, nil
case RandomRoute:
return protobuf.SimpleRoutes_Random, nil
default:
return protobuf.SimpleRoutes_Random, &RequestError{"Invalid simple node route"}
}
}

// Defines type of the node being addressed.
type SlotType int

const (
// Address a primary node.
SlotTypePrimary SlotType = iota
// Address a replica node.
SlotTypeReplica
)

func mapSlotType(slotType SlotType) (protobuf.SlotTypes, error) {
switch slotType {
case SlotTypePrimary:
return protobuf.SlotTypes_Primary, nil
case SlotTypeReplica:
return protobuf.SlotTypes_Replica, nil
default:
return protobuf.SlotTypes_Primary, &RequestError{"Invalid slot type"}
}
}

// Request routing configuration overrides the [api.ReadFrom] connection configuration.
// If SlotTypeReplica is used, the request will be routed to a replica, even if the strategy is ReadFrom [api.PreferReplica].
type SlotIdRoute struct {
slotType SlotType
slotID int32
}

// - slotType: Defines type of the node being addressed.
// - slotId: Slot number. There are 16384 slots in a Valkey cluster, and each shard manages a slot range. Unless the slot is
// known, it's better to route using [api.SlotTypePrimary].
func NewSlotIdRoute(slotType SlotType, slotId int32) *SlotIdRoute {
return &SlotIdRoute{slotType: slotType, slotID: slotId}
}

func (slotIdRoute *SlotIdRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
slotType, err := mapSlotType(slotIdRoute.slotType)
if err != nil {
return nil, err
}

request := &protobuf.Routes{
Value: &protobuf.Routes_SlotIdRoute{
SlotIdRoute: &protobuf.SlotIdRoute{
SlotType: slotType,
SlotId: slotIdRoute.slotID,
},
},
}
return request, nil
}

// Request routing configuration overrides the [api.ReadFrom] connection configuration.
// If SlotTypeReplica is used, the request will be routed to a replica, even if the strategy is ReadFrom [api.PreferReplica].
type SlotKeyRoute struct {
slotType SlotType
slotKey string
}

// - slotType: Defines type of the node being addressed.
// - slotKey: The request will be sent to nodes managing this key.
func NewSlotKeyRoute(slotType SlotType, slotKey string) *SlotKeyRoute {
return &SlotKeyRoute{slotType: slotType, slotKey: slotKey}
}

func (slotKeyRoute *SlotKeyRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
slotType, err := mapSlotType(slotKeyRoute.slotType)
if err != nil {
return nil, err
}

request := &protobuf.Routes{
Value: &protobuf.Routes_SlotKeyRoute{
SlotKeyRoute: &protobuf.SlotKeyRoute{
SlotType: slotType,
SlotKey: slotKeyRoute.slotKey,
},
},
}
return request, nil
}

// Routes a request to a node by its address.
type ByAddressRoute struct {
host string
port int32
}

// Create a route using hostname/address and port.
// - host: The endpoint of the node. If port is not provided, host should be in the "address:port" format, where address is the
// preferred endpoint as shown in the output of the CLUSTER SLOTS command.
// - port: The port to access the node. If port is not provided, host is assumed to be in the format "address:port".
func NewByAddressRoute(host string, port int32) *ByAddressRoute {
return &ByAddressRoute{host: host, port: port}
}

// Create a route using address string formatted as "address:port".
func NewByAddressRouteWithHost(host string) (*ByAddressRoute, error) {
split := strings.Split(host, ":")
if len(split) != 2 {
return nil, &RequestError{
fmt.Sprintf(
"no port provided, or host is not in the expected format 'hostname:port'. Received: %s", host,
),
}
}

port, err := strconv.ParseInt(split[1], 10, 32)
if err != nil {
return nil, &RequestError{
fmt.Sprintf(
"port must be a valid integer. Received: %s", split[1],
),
}
}

return &ByAddressRoute{host: split[0], port: int32(port)}, nil
}

func (byAddressRoute *ByAddressRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
request := &protobuf.Routes{
Value: &protobuf.Routes_ByAddressRoute{
ByAddressRoute: &protobuf.ByAddressRoute{
Host: byAddressRoute.host,
Port: byAddressRoute.port,
},
},
}
return request, nil
}
96 changes: 96 additions & 0 deletions go/api/request_routing_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

package api

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/valkey-io/valkey-glide/go/glide/protobuf"
)

func TestSimpleNodeRoute(t *testing.T) {
config := AllNodes
expected := &protobuf.Routes{
Value: &protobuf.Routes_SimpleRoutes{
SimpleRoutes: protobuf.SimpleRoutes_AllNodes,
},
}

result, err := config.toRoutesProtobuf()

assert.Equal(t, expected, result)
assert.Nil(t, err)
}

func TestSlotIdRoute(t *testing.T) {
config := NewSlotIdRoute(SlotTypePrimary, int32(100))
expected := &protobuf.Routes{
Value: &protobuf.Routes_SlotIdRoute{
SlotIdRoute: &protobuf.SlotIdRoute{
SlotType: protobuf.SlotTypes_Primary,
SlotId: 100,
},
},
}

result, err := config.toRoutesProtobuf()

assert.Equal(t, expected, result)
assert.Nil(t, err)
}

func TestSlotKeyRoute(t *testing.T) {
config := NewSlotKeyRoute(SlotTypePrimary, "Slot1")
expected := &protobuf.Routes{
Value: &protobuf.Routes_SlotKeyRoute{
SlotKeyRoute: &protobuf.SlotKeyRoute{
SlotType: protobuf.SlotTypes_Primary,
SlotKey: "Slot1",
},
},
}

result, err := config.toRoutesProtobuf()

assert.Equal(t, expected, result)
assert.Nil(t, err)
}

func TestByAddressRoute(t *testing.T) {
config := NewByAddressRoute("localhost", int32(6739))
expected := &protobuf.Routes{
Value: &protobuf.Routes_ByAddressRoute{
ByAddressRoute: &protobuf.ByAddressRoute{Host: "localhost", Port: 6739},
},
}

result, err := config.toRoutesProtobuf()

assert.Equal(t, expected, result)
assert.Nil(t, err)
}

func TestByAddressRouteWithHost(t *testing.T) {
config, _ := NewByAddressRouteWithHost("localhost:6739")
expected := &protobuf.Routes{
Value: &protobuf.Routes_ByAddressRoute{
ByAddressRoute: &protobuf.ByAddressRoute{Host: "localhost", Port: 6739},
},
}

result, err := config.toRoutesProtobuf()

assert.Equal(t, expected, result)
assert.Nil(t, err)
}

func TestByAddressRoute_MultiplePorts(t *testing.T) {
_, err := NewByAddressRouteWithHost("localhost:6739:6740")
assert.NotNil(t, err)
}

func TestByAddressRoute_InvalidHost(t *testing.T) {
_, err := NewByAddressRouteWithHost("localhost")
assert.NotNil(t, err)
}
Loading

0 comments on commit 2d78c84

Please sign in to comment.