Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintroduce the random connection strategy #1313

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ conn.SetConnMaxLifetime(time.Hour)
* username/password - auth credentials
* database - select the current default database
* dial_timeout - a duration string is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix such as "300ms", "1s". Valid time units are "ms", "s", "m". (default 30s)
* connection_open_strategy - round_robin/in_order (default in_order).
* round_robin - choose a round-robin server from the set
* connection_open_strategy - random/round_robin/in_order (default in_order).
* random - choose random server from the set
* round_robin - choose a round-robin server from the set
* in_order - first live server is chosen in specified order
* debug - enable debug output (boolean value)
* compress - compress - specify the compression algorithm - “none” (default), `zstd`, `lz4`, `gzip`, `deflate`, `br`. If set to `true`, `lz4` will be used.
Expand Down
4 changes: 4 additions & 0 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sync/atomic"
"time"

Expand Down Expand Up @@ -233,13 +234,16 @@ func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
}

func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dial) (r DialResult, err error) {
random := rand.Int()
for i := range opt.Addr {
var num int
switch opt.ConnOpenStrategy {
case ConnOpenInOrder:
num = i
case ConnOpenRoundRobin:
num = (int(connID) + i) % len(opt.Addr)
case ConnOpenRandom:
num = (random + i) % len(opt.Addr)
}

if r, err = dial(ctx, opt.Addr[num], opt); err == nil {
Expand Down
3 changes: 3 additions & 0 deletions clickhouse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type ConnOpenStrategy uint8
const (
ConnOpenInOrder ConnOpenStrategy = iota
ConnOpenRoundRobin
ConnOpenRandom
)

type Protocol int
Expand Down Expand Up @@ -265,6 +266,8 @@ func (o *Options) fromDSN(in string) error {
o.ConnOpenStrategy = ConnOpenInOrder
case "round_robin":
o.ConnOpenStrategy = ConnOpenRoundRobin
case "random":
o.ConnOpenStrategy = ConnOpenRandom
}
case "max_open_conns":
maxOpenConns, err := strconv.Atoi(params.Get(v))
Expand Down
4 changes: 4 additions & 0 deletions clickhouse_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io"
"log"
"math/rand"
"os"
"reflect"
"strings"
Expand Down Expand Up @@ -81,13 +82,16 @@ func (o *stdConnOpener) Connect(ctx context.Context) (_ driver.Conn, err error)
return nil, ErrAcquireConnNoAddress
}

random := rand.Int()
for i := range o.opt.Addr {
var num int
switch o.opt.ConnOpenStrategy {
case ConnOpenInOrder:
num = i
case ConnOpenRoundRobin:
num = (int(connID) + i) % len(o.opt.Addr)
case ConnOpenRandom:
num = (random + i) % len(o.opt.Addr)
}
if conn, err = dialFunc(ctx, o.opt.Addr[num], connID, o.opt); err == nil {
var debugf = func(format string, v ...any) {}
Expand Down
1 change: 1 addition & 0 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestIterableOrderedMapInsertRead(t *testing.T) {
func TestMultiHostConnect(t *testing.T) {
require.NoError(t, MultiHostVersion())
require.NoError(t, MultiHostRoundRobinVersion())
require.NoError(t, MultiHostRandomVersion())
}

func TestNested(t *testing.T) {
Expand Down
40 changes: 18 additions & 22 deletions examples/clickhouse_api/multi_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,36 @@ import (
)

func MultiHostVersion() error {
return multiHostVersion(nil)
}

func MultiHostRoundRobinVersion() error {
connOpenStrategy := clickhouse.ConnOpenRoundRobin
return multiHostVersion(&connOpenStrategy)
}

func MultiHostRandomVersion() error {
connOpenStrategy := clickhouse.ConnOpenRandom
return multiHostVersion(&connOpenStrategy)
}

func multiHostVersion(connOpenStrategy *clickhouse.ConnOpenStrategy) error {
env, err := GetNativeTestEnvironment()
if err != nil {
return err
}
conn, err := clickhouse.Open(&clickhouse.Options{
options := clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
if connOpenStrategy != nil {
options.ConnOpenStrategy = *connOpenStrategy
}
fmt.Println(v.String())
return nil
}

func MultiHostRoundRobinVersion() error {
env, err := GetNativeTestEnvironment()
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
conn, err := clickhouse.Open(&options)
if err != nil {
return err
}
Expand Down
54 changes: 19 additions & 35 deletions tests/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,39 +67,20 @@ func TestBadConn(t *testing.T) {
}

func TestConnFailover(t *testing.T) {
env, err := GetNativeTestEnvironment()
require.NoError(t, err)
useSSL, err := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_SSL", "false"))
require.NoError(t, err)
port := env.Port
var tlsConfig *tls.Config
if useSSL {
port = env.SslPort
tlsConfig = &tls.Config{}
}
conn, err := GetConnectionWithOptions(&clickhouse.Options{
Addr: []string{
"127.0.0.1:9001",
"127.0.0.1:9002",
fmt.Sprintf("%s:%d", env.Host, port),
},
Auth: clickhouse.Auth{
Database: "default",
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
TLS: tlsConfig,
})
require.NoError(t, err)
require.NoError(t, conn.Ping(context.Background()))
t.Log(conn.ServerVersion())
t.Log(conn.Ping(context.Background()))
testConnFailover(t, nil)
}

func TestConnFailoverConnOpenRoundRobin(t *testing.T) {
func TestConnFailoverRoundRobin(t *testing.T) {
strategy := clickhouse.ConnOpenRoundRobin
testConnFailover(t, &strategy)
}

func TestConnFailoverRandom(t *testing.T) {
strategy := clickhouse.ConnOpenRandom
testConnFailover(t, &strategy)
}

func testConnFailover(t *testing.T, connOpenStrategy *clickhouse.ConnOpenStrategy) {
env, err := GetNativeTestEnvironment()
require.NoError(t, err)
useSSL, err := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_SSL", "false"))
Expand All @@ -110,7 +91,7 @@ func TestConnFailoverConnOpenRoundRobin(t *testing.T) {
port = env.SslPort
tlsConfig = &tls.Config{}
}
conn, err := GetConnectionWithOptions(&clickhouse.Options{
options := clickhouse.Options{
Addr: []string{
"127.0.0.1:9001",
"127.0.0.1:9002",
Expand All @@ -124,9 +105,12 @@ func TestConnFailoverConnOpenRoundRobin(t *testing.T) {
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
TLS: tlsConfig,
})
TLS: tlsConfig,
}
if connOpenStrategy != nil {
options.ConnOpenStrategy = *connOpenStrategy
}
conn, err := GetConnectionWithOptions(&options)
require.NoError(t, err)
require.NoError(t, conn.Ping(context.Background()))
t.Log(conn.ServerVersion())
Expand Down
54 changes: 26 additions & 28 deletions tests/std/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,42 +51,40 @@ func TestStdConn(t *testing.T) {
}

func TestStdConnFailover(t *testing.T) {
env, err := GetStdTestEnvironment()
require.NoError(t, err)
useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false"))
require.NoError(t, err)
dsns := map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,%s:%d", env.Username, env.Password, env.Host, env.Port),
"Http": fmt.Sprintf("http://%s:%s@127.0.0.1:8124,127.0.0.1:8125,%s:%d", env.Username, env.Password, env.Host, env.HttpPort)}
if useSSL {
dsns = map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,%s:%d?secure=true", env.Username, env.Password, env.Host, env.SslPort),
"Http": fmt.Sprintf("https://%s:%s@127.0.0.1:8124,127.0.0.1:8125,%s:%d?secure=true", env.Username, env.Password, env.Host, env.HttpsPort)}
}
for name, dsn := range dsns {
t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) {
testStdConnFailover(t, "")
}

if conn, err := sql.Open("clickhouse", dsn); assert.NoError(t, err) {
if err := conn.PingContext(context.Background()); assert.NoError(t, err) {
t.Log(conn.PingContext(context.Background()))
}
}
})
}
func TestStdConnFailoverRoundRobin(t *testing.T) {
testStdConnFailover(t, "round_robin")
}

func TestStdConnFailoverConnOpenRoundRobin(t *testing.T) {
func TestStdConnFailoverRandom(t *testing.T) {
testStdConnFailover(t, "random")
}

func testStdConnFailover(t *testing.T, openStrategy string) {
env, err := GetStdTestEnvironment()
require.NoError(t, err)
dsns := map[string]string{
"Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/?connection_open_strategy=round_robin", env.Username, env.Password, env.Host, env.Port),
"Http": fmt.Sprintf("http://%s:%s@127.0.0.1:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/?connection_open_strategy=round_robin", env.Username, env.Password, env.Host, env.HttpPort),
}
useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false"))
require.NoError(t, err)
nativePort := env.Port
httpPort := env.HttpPort
argsList := []string{}
if useSSL {
dsns = map[string]string{
"Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/?connection_open_strategy=round_robin&secure=true", env.Username, env.Password, env.Host, env.SslPort),
"Http": fmt.Sprintf("https://%s:%s@127.0.0.1:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/?connection_open_strategy=round_robin&secure=true", env.Username, env.Password, env.Host, env.HttpsPort),
}
nativePort = env.SslPort
httpPort = env.HttpsPort
argsList = append(argsList, "secure=true")
}
if len(openStrategy) > 0 {
argsList = append(argsList, fmt.Sprintf("connection_open_strategy=%s", openStrategy))
}
args := ""
if len(argsList) > 0 {
args = "?" + strings.Join(argsList, "&")
}
dsns := map[string]string{
"Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/%s", env.Username, env.Password, env.Host, nativePort, args),
"Http": fmt.Sprintf("http://%s:%s@127.0.0.1:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/%s", env.Username, env.Password, env.Host, httpPort, args),
}
for name, dsn := range dsns {
t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ func optionsToDSN(o *clickhouse.Options) string {
strategy = "in_order"
case clickhouse.ConnOpenRoundRobin:
strategy = "round_robin"
case clickhouse.ConnOpenRandom:
strategy = "random"
}

params.Set("connection_open_strategy", strategy)
Expand Down
Loading