Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO: Adding Route support #2836

Merged
merged 21 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ac2cfad
GO:Handling interface response
janhavigupta007 Oct 21, 2024
1fcdb42
Merge remote-tracking branch 'upstream/main' into go/generic_commands
janhavigupta007 Nov 4, 2024
7d2fdbc
Fixing lint
janhavigupta007 Nov 4, 2024
cc59ece
Adding more tests for custom command
janhavigupta007 Nov 15, 2024
eb3adf8
GO: Fixing errors to support version 1.18
janhavigupta007 Nov 18, 2024
114dcb6
GO: Fixing the rust crate errors
janhavigupta007 Nov 18, 2024
820f5a4
Merge remote-tracking branch 'upstream/main' into go/generic_commands
janhavigupta007 Nov 18, 2024
6d4635b
GO: Adding Route support
janhavigupta007 Nov 26, 2024
8bf4b69
Merge remote-tracking branch 'upstream/main' into go/route
janhavigupta007 Dec 18, 2024
0659f5e
Small fixes in lint
janhavigupta007 Dec 18, 2024
0c2fed2
Small fixes in lint
janhavigupta007 Dec 18, 2024
5ea5e53
Merge branch 'go/route' of github.com:janhavigupta007/glide-for-redis…
janhavigupta007 Dec 18, 2024
e1ba414
Merge branch 'go/route' of github.com:janhavigupta007/glide-for-redis…
janhavigupta007 Dec 18, 2024
87f6ea6
Merge branch 'go/route' of github.com:janhavigupta007/glide-for-redis…
janhavigupta007 Dec 18, 2024
e2c8ef1
GO: Lint fixes
janhavigupta007 Dec 18, 2024
e3e87b5
GO: Lint fixes
janhavigupta007 Dec 18, 2024
a987e36
Merge branch 'go/route' of github.com:janhavigupta007/glide-for-redis…
janhavigupta007 Dec 23, 2024
8b7162a
Merge remote-tracking branch 'upstream/main' into go/route
janhavigupta007 Dec 23, 2024
e901542
Go:Addressing comments
janhavigupta007 Dec 23, 2024
c5a002c
Go:Addressing comments
janhavigupta007 Dec 23, 2024
b70e17e
Merge branch 'go/route' of github.com:janhavigupta007/glide-for-redis…
janhavigupta007 Dec 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,17 @@ func (client *baseClient) Close() {
}

func (client *baseClient) executeCommand(requestType C.RequestType, args []string) (*C.struct_CommandResponse, error) {
return client.executeCommandWithRoute(requestType, args, nil)
}

func (client *baseClient) executeCommandWithRoute(
requestType C.RequestType,
args []string,
route route,
) (*C.struct_CommandResponse, error) {
if client.coreClient == nil {
return nil, &ClosingError{"ExecuteCommand failed. The client is closed."}
}

var cArgsPtr *C.uintptr_t = nil
var argLengthsPtr *C.ulong = nil
if len(args) > 0 {
Expand All @@ -118,13 +125,31 @@ func (client *baseClient) executeCommand(requestType C.RequestType, args []strin
resultChannel := make(chan payload)
resultChannelPtr := uintptr(unsafe.Pointer(&resultChannel))

var routeBytesPtr *C.uchar = nil
var routeBytesCount C.uintptr_t = 0
if route != nil {
routeProto, err := route.toRoutesProtobuf()
if err != nil {
return nil, &RequestError{"ExecuteCommand failed due to invalid route"}
}
msg, err := proto.Marshal(routeProto)
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

routeBytesCount = C.uintptr_t(len(msg))
routeBytesPtr = (*C.uchar)(C.CBytes(msg))
}

C.command(
client.coreClient,
C.uintptr_t(resultChannelPtr),
uint32(requestType),
C.size_t(len(args)),
cArgsPtr,
argLengthsPtr,
routeBytesPtr,
routeBytesCount,
)
payload := <-resultChannel
if payload.error != nil {
Expand Down Expand Up @@ -948,32 +973,22 @@ func (client *baseClient) BLMove(
return handleStringOrNullResponse(result)
}

func (client *baseClient) Ping() (string, error) {
func (client *baseClient) Ping() (Result[string], error) {
result, err := client.executeCommand(C.Ping, []string{})
if err != nil {
return "", err
return CreateNilStringResult(), err
}

response, err := handleStringResponse(result)
if err != nil {
return "", err
}
return response.Value(), nil
return handleStringResponse(result)
}

func (client *baseClient) PingWithMessage(message string) (string, error) {
args := []string{message}

result, err := client.executeCommand(C.Ping, args)
func (client *baseClient) PingWithMessage(message string) (Result[string], error) {
result, err := client.executeCommand(C.Ping, []string{message})
if err != nil {
return "", err
return CreateNilStringResult(), err
}

response, err := handleStringResponse(result)
if err != nil {
return "", err
}
return response.Value(), nil
return handleStringResponse(result)
}

func (client *baseClient) Del(keys []string) (Result[int64], error) {
Expand Down
4 changes: 2 additions & 2 deletions go/api/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ type ConnectionManagementCommands interface {
// result, err := client.Ping("Hello")
//
// [valkey.io]: https://valkey.io/commands/ping/
Ping() (string, error)
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
Ping() (Result[string], error)

// Pings the server with a custom message.
//
Expand All @@ -737,5 +737,5 @@ type ConnectionManagementCommands interface {
// result, err := client.PingWithMessage("Hello")
//
// [valkey.io]: https://valkey.io/commands/ping/
PingWithMessage(message string) (string, error)
PingWithMessage(message string) (Result[string], error)
}
59 changes: 59 additions & 0 deletions go/api/glide_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

package api

// #cgo LDFLAGS: -L../target/release -lglide_rs
// #include "../lib.h"
//
// void successCallback(void *channelPtr, struct CommandResponse *message);
// void failureCallback(void *channelPtr, char *errMessage, RequestErrorType errType);
import "C"

// GlideClusterClient is a client used for connection in cluster mode.
type GlideClusterClient struct {
*baseClient
Expand All @@ -16,3 +23,55 @@ func NewGlideClusterClient(config *GlideClusterClientConfiguration) (*GlideClust

return &GlideClusterClient{client}, nil
}

// Pings the server and returns "PONG".
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
//
// Paramters:
//
// route - Specifies the routing configuration for the command. The client will route the command to the nodes defined by
//
// route.
//
// Return value:
//
// A Result[string] containing "PONG" is returned.
//
// For example:
//
// result, err := client.PingWithRoute(api.SimpleNodeRouteAllPrimaries)
//
// [valkey.io]: https://valkey.io/commands/ping/
func (client *GlideClusterClient) PingWithRoute(route route) (Result[string], error) {
result, err := client.executeCommandWithRoute(C.Ping, []string{}, route)
if err != nil {
return CreateNilStringResult(), err
}

return handleStringResponse(result)
}

// Pings the server and returns the message.
//
// Paramters:
//
// route - Specifies the routing configuration for the command. The client will route the command to the nodes defined by
//
// route.
//
// Return value:
//
// A Result[string] containing message is returned.
//
// For example:
//
// result, err := client.PingWithRouteAndMessage("Hello", api.SimpleNodeRouteAllPrimaries)
//
// [valkey.io]: https://valkey.io/commands/ping/
func (client *GlideClusterClient) PingWithRouteAndMessage(message string, route route) (Result[string], error) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
result, err := client.executeCommandWithRoute(C.Ping, []string{message}, route)
if err != nil {
return CreateNilStringResult(), err
}

return handleStringResponse(result)
}
189 changes: 189 additions & 0 deletions go/api/request_routing_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package api
janhavigupta007 marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"strconv"
"strings"

"github.com/valkey-io/valkey-glide/go/glide/protobuf"
)

// Request routing basic interface. Please use one of the following:
// - [api.SimpleNodeRoute]
// - [api.SlotIdRoute]
// - [api.SlotKeyRoute]
// - [api.ByAddressRoute]
type route interface {
toRoutesProtobuf() (*protobuf.Routes, error)
}

type SimpleNodeRoute int

const (
// Route request to all nodes.
// Warning: Don't use it with write commands, they could be routed to a replica (RO) node and fail.
SimpleNodeRouteAllNodes SimpleNodeRoute = iota
janhavigupta007 marked this conversation as resolved.
Show resolved Hide resolved
// Route request to all primary nodes.
SimpleNodeRouteAllPrimaries
// Route request to a random node.
// Warning: Don't use it with write commands, because they could be randomly routed to a replica (RO) node and fail.
SimpleNodeRouteRandom
)

func (simpleNodeRoute SimpleNodeRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
simpleRouteProto, err := mapSimpleNodeRoute(simpleNodeRoute)
if err != nil {
return nil, err
}

request := &protobuf.Routes{
Value: &protobuf.Routes_SimpleRoutes{
SimpleRoutes: simpleRouteProto,
},
}
return request, nil
}

func mapSimpleNodeRoute(simpleNodeRoute SimpleNodeRoute) (protobuf.SimpleRoutes, error) {
switch simpleNodeRoute {
case SimpleNodeRouteAllNodes:
return protobuf.SimpleRoutes_AllNodes, nil
case SimpleNodeRouteAllPrimaries:
return protobuf.SimpleRoutes_AllPrimaries, nil
case SimpleNodeRouteRandom:
return protobuf.SimpleRoutes_Random, nil
default:
return protobuf.SimpleRoutes_Random, &RequestError{"Invalid simple node route"}
}
}

// Defines type of the node being addressed.
type SlotType int

const (
// Address a primary node.
SlotTypePrimary SlotType = iota
// Address a replica node.
SlotTypeReplica
)

func mapSlotType(slotType SlotType) (protobuf.SlotTypes, error) {
switch slotType {
case SlotTypePrimary:
return protobuf.SlotTypes_Primary, nil
case SlotTypeReplica:
return protobuf.SlotTypes_Replica, nil
default:
return protobuf.SlotTypes_Primary, &RequestError{"Invalid slot type"}
}
}

// Request routing configuration overrides the [api.ReadFrom] connection configuration.
// If SlotTypeReplica is used, the request will be routed to a replica, even if the strategy is ReadFrom [api.PreferReplica].
type SlotIdRoute struct {
// Defines type of the node being addressed.
slotType SlotType
// Slot number. There are 16384 slots in a Valkey cluster, and each shard manages a slot range.
// Unless the slot is known, it's better to route using [api.SlotTypePrimary].
slotID int32
}

func NewSlotIdRoute(slotType SlotType, slotId int32) *SlotIdRoute {
return &SlotIdRoute{slotType: slotType, slotID: slotId}
}

func (slotIdRoute *SlotIdRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
slotType, err := mapSlotType(slotIdRoute.slotType)
if err != nil {
return nil, err
}

request := &protobuf.Routes{
Value: &protobuf.Routes_SlotIdRoute{
SlotIdRoute: &protobuf.SlotIdRoute{
SlotType: slotType,
SlotId: slotIdRoute.slotID,
},
},
}
return request, nil
}

// Request routing configuration overrides the [api.ReadFrom] connection configuration.
// If SlotTypeReplica is used, the request will be routed to a replica, even if the strategy is ReadFrom [api.PreferReplica].
type SlotKeyRoute struct {
// Defines type of the node being addressed.
slotType SlotType
// The request will be sent to nodes managing this key.
slotKey string
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
}

func NewSlotKeyRoute(slotType SlotType, slotKey string) *SlotKeyRoute {
return &SlotKeyRoute{slotType: slotType, slotKey: slotKey}
}

func (slotKeyRoute *SlotKeyRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
slotType, err := mapSlotType(slotKeyRoute.slotType)
if err != nil {
return nil, err
}

request := &protobuf.Routes{
Value: &protobuf.Routes_SlotKeyRoute{
SlotKeyRoute: &protobuf.SlotKeyRoute{
SlotType: slotType,
SlotKey: slotKeyRoute.slotKey,
},
},
}
return request, nil
}

// Routes a request to a node by its address.
type ByAddressRoute struct {
// The endpoint of the node. If port is not provided, host should be in the "address:port" format, where address is the
// preferred endpoint as shown in the output of the CLUSTER SLOTS command.
host string
// The port to access the node. If port is not provided, host is assumed to be in the format "address:port".
port int32
}

// Create a route using hostname/address and port.
func NewByAddressRoute(host string, port int32) *ByAddressRoute {
return &ByAddressRoute{host: host, port: port}
}

// Create a route using address string formatted as "address:port".
func NewByAddressRouteWithHost(host string) (*ByAddressRoute, error) {
split := strings.Split(host, ":")
if len(split) < 2 || len(split) > 2 {
janhavigupta007 marked this conversation as resolved.
Show resolved Hide resolved
return nil, &RequestError{
fmt.Sprintf(
"no port provided, or host is not in the expected format 'hostname:port'. Received: %s", host,
),
}
}

port, err := strconv.ParseInt(split[1], 10, 32)
if err != nil {
return nil, &RequestError{
fmt.Sprintf(
"port must be a valid integer. Received: %s", split[1],
),
}
}

return &ByAddressRoute{host: split[0], port: int32(port)}, nil
}

func (byAddressRoute *ByAddressRoute) toRoutesProtobuf() (*protobuf.Routes, error) {
request := &protobuf.Routes{
Value: &protobuf.Routes_ByAddressRoute{
ByAddressRoute: &protobuf.ByAddressRoute{
Host: byAddressRoute.host,
Port: byAddressRoute.port,
},
},
}
return request, nil
}
Loading
Loading