diff --git a/README.md b/README.md index ef08eb0aa6..0cef001afa 100644 --- a/README.md +++ b/README.md @@ -151,8 +151,9 @@ conn.SetConnMaxLifetime(time.Hour) * username/password - auth credentials * database - select the current default database * dial_timeout - a duration string is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix such as "300ms", "1s". Valid time units are "ms", "s", "m". (default 30s) -* connection_open_strategy - round_robin/in_order (default in_order). - * round_robin - choose a round-robin server from the set +* connection_open_strategy - random/round_robin/in_order (default in_order). + * random - choose random server from the set + * round_robin - choose a round-robin server from the set * in_order - first live server is chosen in specified order * debug - enable debug output (boolean value) * compress - compress - specify the compression algorithm - “none” (default), `zstd`, `lz4`, `gzip`, `deflate`, `br`. If set to `true`, `lz4` will be used. diff --git a/clickhouse.go b/clickhouse.go index 78f0a6a721..d7ad40ec18 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "math/rand" "sync/atomic" "time" @@ -233,6 +234,7 @@ func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) { } func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dial) (r DialResult, err error) { + random := rand.Int() for i := range opt.Addr { var num int switch opt.ConnOpenStrategy { @@ -240,6 +242,8 @@ func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dia num = i case ConnOpenRoundRobin: num = (int(connID) + i) % len(opt.Addr) + case ConnOpenRandom: + num = (random + i) % len(opt.Addr) } if r, err = dial(ctx, opt.Addr[num], opt); err == nil { diff --git a/clickhouse_options.go b/clickhouse_options.go index d64b04eb89..57376029db 100644 --- a/clickhouse_options.go +++ b/clickhouse_options.go @@ -87,6 +87,7 @@ type ConnOpenStrategy uint8 const ( ConnOpenInOrder ConnOpenStrategy = iota ConnOpenRoundRobin + ConnOpenRandom ) type Protocol int @@ -265,6 +266,8 @@ func (o *Options) fromDSN(in string) error { o.ConnOpenStrategy = ConnOpenInOrder case "round_robin": o.ConnOpenStrategy = ConnOpenRoundRobin + case "random": + o.ConnOpenStrategy = ConnOpenRandom } case "max_open_conns": maxOpenConns, err := strconv.Atoi(params.Get(v)) diff --git a/clickhouse_std.go b/clickhouse_std.go index 3afa3b6f3e..39eea1ce7d 100644 --- a/clickhouse_std.go +++ b/clickhouse_std.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "log" + "math/rand" "os" "reflect" "strings" @@ -81,6 +82,7 @@ func (o *stdConnOpener) Connect(ctx context.Context) (_ driver.Conn, err error) return nil, ErrAcquireConnNoAddress } + random := rand.Int() for i := range o.opt.Addr { var num int switch o.opt.ConnOpenStrategy { @@ -88,6 +90,8 @@ func (o *stdConnOpener) Connect(ctx context.Context) (_ driver.Conn, err error) num = i case ConnOpenRoundRobin: num = (int(connID) + i) % len(o.opt.Addr) + case ConnOpenRandom: + num = (random + i) % len(o.opt.Addr) } if conn, err = dialFunc(ctx, o.opt.Addr[num], connID, o.opt); err == nil { var debugf = func(format string, v ...any) {} diff --git a/examples/clickhouse_api/main_test.go b/examples/clickhouse_api/main_test.go index 1fd6cfc08b..9d3bbb2cd0 100644 --- a/examples/clickhouse_api/main_test.go +++ b/examples/clickhouse_api/main_test.go @@ -163,6 +163,7 @@ func TestIterableOrderedMapInsertRead(t *testing.T) { func TestMultiHostConnect(t *testing.T) { require.NoError(t, MultiHostVersion()) require.NoError(t, MultiHostRoundRobinVersion()) + require.NoError(t, MultiHostRandomVersion()) } func TestNested(t *testing.T) { diff --git a/examples/clickhouse_api/multi_host.go b/examples/clickhouse_api/multi_host.go index 8ed249bc17..421daefbf1 100644 --- a/examples/clickhouse_api/multi_host.go +++ b/examples/clickhouse_api/multi_host.go @@ -23,40 +23,36 @@ import ( ) func MultiHostVersion() error { + return multiHostVersion(nil) +} + +func MultiHostRoundRobinVersion() error { + connOpenStrategy := clickhouse.ConnOpenRoundRobin + return multiHostVersion(&connOpenStrategy) +} + +func MultiHostRandomVersion() error { + connOpenStrategy := clickhouse.ConnOpenRandom + return multiHostVersion(&connOpenStrategy) +} + +func multiHostVersion(connOpenStrategy *clickhouse.ConnOpenStrategy) error { env, err := GetNativeTestEnvironment() if err != nil { return err } - conn, err := clickhouse.Open(&clickhouse.Options{ + options := clickhouse.Options{ Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)}, Auth: clickhouse.Auth{ Database: env.Database, Username: env.Username, Password: env.Password, }, - }) - if err != nil { - return err } - v, err := conn.ServerVersion() - if err != nil { - return err + if connOpenStrategy != nil { + options.ConnOpenStrategy = *connOpenStrategy } - fmt.Println(v.String()) - return nil -} - -func MultiHostRoundRobinVersion() error { - env, err := GetNativeTestEnvironment() - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)}, - ConnOpenStrategy: clickhouse.ConnOpenRoundRobin, - Auth: clickhouse.Auth{ - Database: env.Database, - Username: env.Username, - Password: env.Password, - }, - }) + conn, err := clickhouse.Open(&options) if err != nil { return err } diff --git a/tests/conn_test.go b/tests/conn_test.go index d5d2bc29ef..4a8fc9a4ca 100644 --- a/tests/conn_test.go +++ b/tests/conn_test.go @@ -67,39 +67,20 @@ func TestBadConn(t *testing.T) { } func TestConnFailover(t *testing.T) { - env, err := GetNativeTestEnvironment() - require.NoError(t, err) - useSSL, err := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_SSL", "false")) - require.NoError(t, err) - port := env.Port - var tlsConfig *tls.Config - if useSSL { - port = env.SslPort - tlsConfig = &tls.Config{} - } - conn, err := GetConnectionWithOptions(&clickhouse.Options{ - Addr: []string{ - "127.0.0.1:9001", - "127.0.0.1:9002", - fmt.Sprintf("%s:%d", env.Host, port), - }, - Auth: clickhouse.Auth{ - Database: "default", - Username: env.Username, - Password: env.Password, - }, - Compression: &clickhouse.Compression{ - Method: clickhouse.CompressionLZ4, - }, - TLS: tlsConfig, - }) - require.NoError(t, err) - require.NoError(t, conn.Ping(context.Background())) - t.Log(conn.ServerVersion()) - t.Log(conn.Ping(context.Background())) + testConnFailover(t, nil) } -func TestConnFailoverConnOpenRoundRobin(t *testing.T) { +func TestConnFailoverRoundRobin(t *testing.T) { + strategy := clickhouse.ConnOpenRoundRobin + testConnFailover(t, &strategy) +} + +func TestConnFailoverRandom(t *testing.T) { + strategy := clickhouse.ConnOpenRandom + testConnFailover(t, &strategy) +} + +func testConnFailover(t *testing.T, connOpenStrategy *clickhouse.ConnOpenStrategy) { env, err := GetNativeTestEnvironment() require.NoError(t, err) useSSL, err := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_SSL", "false")) @@ -110,7 +91,7 @@ func TestConnFailoverConnOpenRoundRobin(t *testing.T) { port = env.SslPort tlsConfig = &tls.Config{} } - conn, err := GetConnectionWithOptions(&clickhouse.Options{ + options := clickhouse.Options{ Addr: []string{ "127.0.0.1:9001", "127.0.0.1:9002", @@ -124,9 +105,12 @@ func TestConnFailoverConnOpenRoundRobin(t *testing.T) { Compression: &clickhouse.Compression{ Method: clickhouse.CompressionLZ4, }, - ConnOpenStrategy: clickhouse.ConnOpenRoundRobin, - TLS: tlsConfig, - }) + TLS: tlsConfig, + } + if connOpenStrategy != nil { + options.ConnOpenStrategy = *connOpenStrategy + } + conn, err := GetConnectionWithOptions(&options) require.NoError(t, err) require.NoError(t, conn.Ping(context.Background())) t.Log(conn.ServerVersion()) diff --git a/tests/std/conn_test.go b/tests/std/conn_test.go index 96ae2c4b7c..c80239e257 100644 --- a/tests/std/conn_test.go +++ b/tests/std/conn_test.go @@ -51,42 +51,40 @@ func TestStdConn(t *testing.T) { } func TestStdConnFailover(t *testing.T) { - env, err := GetStdTestEnvironment() - require.NoError(t, err) - useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false")) - require.NoError(t, err) - dsns := map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,%s:%d", env.Username, env.Password, env.Host, env.Port), - "Http": fmt.Sprintf("http://%s:%s@127.0.0.1:8124,127.0.0.1:8125,%s:%d", env.Username, env.Password, env.Host, env.HttpPort)} - if useSSL { - dsns = map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,%s:%d?secure=true", env.Username, env.Password, env.Host, env.SslPort), - "Http": fmt.Sprintf("https://%s:%s@127.0.0.1:8124,127.0.0.1:8125,%s:%d?secure=true", env.Username, env.Password, env.Host, env.HttpsPort)} - } - for name, dsn := range dsns { - t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) { + testStdConnFailover(t, "") +} - if conn, err := sql.Open("clickhouse", dsn); assert.NoError(t, err) { - if err := conn.PingContext(context.Background()); assert.NoError(t, err) { - t.Log(conn.PingContext(context.Background())) - } - } - }) - } +func TestStdConnFailoverRoundRobin(t *testing.T) { + testStdConnFailover(t, "round_robin") } -func TestStdConnFailoverConnOpenRoundRobin(t *testing.T) { +func TestStdConnFailoverRandom(t *testing.T) { + testStdConnFailover(t, "random") +} + +func testStdConnFailover(t *testing.T, openStrategy string) { env, err := GetStdTestEnvironment() require.NoError(t, err) - dsns := map[string]string{ - "Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/?connection_open_strategy=round_robin", env.Username, env.Password, env.Host, env.Port), - "Http": fmt.Sprintf("http://%s:%s@127.0.0.1:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/?connection_open_strategy=round_robin", env.Username, env.Password, env.Host, env.HttpPort), - } useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false")) require.NoError(t, err) + nativePort := env.Port + httpPort := env.HttpPort + argsList := []string{} if useSSL { - dsns = map[string]string{ - "Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/?connection_open_strategy=round_robin&secure=true", env.Username, env.Password, env.Host, env.SslPort), - "Http": fmt.Sprintf("https://%s:%s@127.0.0.1:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/?connection_open_strategy=round_robin&secure=true", env.Username, env.Password, env.Host, env.HttpsPort), - } + nativePort = env.SslPort + httpPort = env.HttpsPort + argsList = append(argsList, "secure=true") + } + if len(openStrategy) > 0 { + argsList = append(argsList, fmt.Sprintf("connection_open_strategy=%s", openStrategy)) + } + args := "" + if len(argsList) > 0 { + args = "?" + strings.Join(argsList, "&") + } + dsns := map[string]string{ + "Native": fmt.Sprintf("clickhouse://%s:%s@127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/%s", env.Username, env.Password, env.Host, nativePort, args), + "Http": fmt.Sprintf("http://%s:%s@127.0.0.1:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/%s", env.Username, env.Password, env.Host, httpPort, args), } for name, dsn := range dsns { t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) { diff --git a/tests/utils.go b/tests/utils.go index 312aec60a7..4b73140c5c 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -715,6 +715,8 @@ func optionsToDSN(o *clickhouse.Options) string { strategy = "in_order" case clickhouse.ConnOpenRoundRobin: strategy = "round_robin" + case clickhouse.ConnOpenRandom: + strategy = "random" } params.Set("connection_open_strategy", strategy)