From 6fc8af9f78e8277326329f888bd6cfff3b111b0f Mon Sep 17 00:00:00 2001 From: Dale Mcdiarmid Date: Wed, 2 Nov 2022 17:12:18 +0000 Subject: [PATCH 01/13] compress column by column over native --- clickhouse_options.go | 32 ++++++++------- conn.go | 90 +++++++++++++++++++++++-------------------- conn_http.go | 23 ++++++----- lib/proto/block.go | 13 ++++++- 4 files changed, 91 insertions(+), 67 deletions(-) diff --git a/clickhouse_options.go b/clickhouse_options.go index cf412dd6c0..4f992c7462 100644 --- a/clickhouse_options.go +++ b/clickhouse_options.go @@ -116,20 +116,21 @@ func ParseDSN(dsn string) (*Options, error) { type Options struct { Protocol Protocol - TLS *tls.Config - Addr []string - Auth Auth - DialContext func(ctx context.Context, addr string) (net.Conn, error) - Debug bool - Debugf func(format string, v ...interface{}) // only works when Debug is true - Settings Settings - Compression *Compression - DialTimeout time.Duration // default 1 second - MaxOpenConns int // default MaxIdleConns + 5 - MaxIdleConns int // default 5 - ConnMaxLifetime time.Duration // default 1 hour - ConnOpenStrategy ConnOpenStrategy - BlockBufferSize uint8 // default 2 - can be overwritten on query + TLS *tls.Config + Addr []string + Auth Auth + DialContext func(ctx context.Context, addr string) (net.Conn, error) + Debug bool + Debugf func(format string, v ...interface{}) // only works when Debug is true + Settings Settings + Compression *Compression + DialTimeout time.Duration // default 1 second + MaxOpenConns int // default MaxIdleConns + 5 + MaxIdleConns int // default 5 + ConnMaxLifetime time.Duration // default 1 hour + ConnOpenStrategy ConnOpenStrategy + BlockBufferSize uint8 // default 2 - can be overwritten on query + MaxBufferBeforeCompression int // default 1048576 - measured in bytes i.e. 1mb scheme string ReadTimeout time.Duration @@ -290,6 +291,9 @@ func (o Options) setDefaults() *Options { if o.BlockBufferSize <= 0 { o.BlockBufferSize = 2 } + if o.MaxBufferBeforeCompression <= 0 { + o.MaxBufferBeforeCompression = 1048576 + } if o.Addr == nil || len(o.Addr) == 0 { switch o.Protocol { case Native: diff --git a/conn.go b/conn.go index 785bde05ef..13db3bc6b7 100644 --- a/conn.go +++ b/conn.go @@ -72,18 +72,19 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er var ( connect = &connect{ - opt: opt, - conn: conn, - debugf: debugf, - buffer: new(chproto.Buffer), - reader: chproto.NewReader(conn), - revision: proto.ClientTCPProtocolVersion, - structMap: &structMap{}, - compression: compression, - connectedAt: time.Now(), - compressor: compress.NewWriter(), - readTimeout: opt.ReadTimeout, - blockBufferSize: opt.BlockBufferSize, + opt: opt, + conn: conn, + debugf: debugf, + buffer: new(chproto.Buffer), + reader: chproto.NewReader(conn), + revision: proto.ClientTCPProtocolVersion, + structMap: &structMap{}, + compression: compression, + connectedAt: time.Now(), + compressor: compress.NewWriter(), + readTimeout: opt.ReadTimeout, + blockBufferSize: opt.BlockBufferSize, + maxBufferBeforeCompression: opt.MaxBufferBeforeCompression, } ) if err := connect.handshake(opt.Auth.Database, opt.Auth.Username, opt.Auth.Password); err != nil { @@ -101,21 +102,22 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er // https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp type connect struct { - opt *Options - conn net.Conn - debugf func(format string, v ...interface{}) - server ServerVersion - closed bool - buffer *chproto.Buffer - reader *chproto.Reader - released bool - revision uint64 - structMap *structMap - compression CompressionMethod - connectedAt time.Time - compressor *compress.Writer - readTimeout time.Duration - blockBufferSize uint8 + opt *Options + conn net.Conn + debugf func(format string, v ...interface{}) + server ServerVersion + closed bool + buffer *chproto.Buffer + reader *chproto.Reader + released bool + revision uint64 + structMap *structMap + compression CompressionMethod + connectedAt time.Time + compressor *compress.Writer + readTimeout time.Duration + blockBufferSize uint8 + maxBufferBeforeCompression int } func (c *connect) settings(querySettings Settings) []proto.Setting { @@ -177,25 +179,29 @@ func (c *connect) exception() error { return &e } +func (c *connect) flushBuffer(buffer *chproto.Buffer, from int, end bool) (int, error) { + if len(buffer.Buf) > c.maxBufferBeforeCompression || (end && len(buffer.Buf) > 0) { + if c.compression != CompressionNone { + data := buffer.Buf[from:] + if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil { + return 0, errors.Wrap(err, "compress") + } + buffer.Buf = append(c.buffer.Buf[:from], c.compressor.Data...) + } + if err := c.flush(); err != nil { + return 0, err + } + buffer.Reset() + return 0, nil + } + return from, nil +} + func (c *connect) sendData(block *proto.Block, name string) error { c.debugf("[send data] compression=%t", c.compression) c.buffer.PutByte(proto.ClientData) c.buffer.PutString(name) - // Saving offset of compressible data. - start := len(c.buffer.Buf) - if err := block.Encode(c.buffer, c.revision); err != nil { - return err - } - if c.compression != CompressionNone { - // Performing compression. Note: only blocks are compressed. - data := c.buffer.Buf[start:] - if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil { - return errors.Wrap(err, "compress") - } - c.buffer.Buf = append(c.buffer.Buf[:start], c.compressor.Data...) - } - - if err := c.flush(); err != nil { + if err := block.Encode(c.buffer, c.flushBuffer, c.revision); err != nil { return err } defer func() { diff --git a/conn_http.go b/conn_http.go index e547c05fb8..7dd8bd2b0b 100644 --- a/conn_http.go +++ b/conn_http.go @@ -331,20 +331,23 @@ func createCompressionPool(compression *Compression) (Pool[HTTPReaderWriter], er return pool, nil } +func (h *httpConnect) flushBuffer(buffer *chproto.Buffer, from int, end bool) (int, error) { + if end && len(buffer.Buf) > 0 && (h.compression == CompressionLZ4 || h.compression == CompressionZSTD) { + data := buffer.Buf[from:] + if err := h.blockCompressor.Compress(compress.Method(h.compression), data); err != nil { + return 0, errors.Wrap(err, "compress") + } + buffer.Buf = append(buffer.Buf[:from], h.blockCompressor.Data...) + return len(buffer.Buf), nil + } + return from, nil +} + func (h *httpConnect) writeData(block *proto.Block) error { // Saving offset of compressible data - start := len(h.buffer.Buf) - if err := block.Encode(h.buffer, 0); err != nil { + if err := block.Encode(h.buffer, h.flushBuffer, 0); err != nil { return err } - if h.compression == CompressionLZ4 || h.compression == CompressionZSTD { - // Performing compression. Supported and requires - data := h.buffer.Buf[start:] - if err := h.blockCompressor.Compress(compress.Method(h.compression), data); err != nil { - return errors.Wrap(err, "compress") - } - h.buffer.Buf = append(h.buffer.Buf[:start], h.blockCompressor.Data...) - } return nil } diff --git a/lib/proto/block.go b/lib/proto/block.go index a94bcec816..ede6c364f3 100644 --- a/lib/proto/block.go +++ b/lib/proto/block.go @@ -117,7 +117,10 @@ func difference(a, b []string) []string { return diff } -func (b *Block) Encode(buffer *proto.Buffer, revision uint64) error { +type flusher func(buffer *proto.Buffer, from int, end bool) (int, error) + +func (b *Block) Encode(buffer *proto.Buffer, flush flusher, revision uint64) (err error) { + start := len(buffer.Buf) if revision > 0 { encodeBlockInfo(buffer) } @@ -149,6 +152,14 @@ func (b *Block) Encode(buffer *proto.Buffer, revision uint64) error { } } c.Encode(buffer) + // invoke flush on each column + if start, err = flush(buffer, start, false); err != nil { + return err + } + } + // flush at end - indicate no more data + if start, err = flush(buffer, start, true); err != nil { + return err } return nil } From 1f6699d03e6d419a1ee0ad935002e21165fd4fcc Mon Sep 17 00:00:00 2001 From: Dale Mcdiarmid Date: Thu, 3 Nov 2022 09:52:10 +0000 Subject: [PATCH 02/13] refer to Buffer --- conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conn.go b/conn.go index 13db3bc6b7..2aa6429011 100644 --- a/conn.go +++ b/conn.go @@ -186,7 +186,7 @@ func (c *connect) flushBuffer(buffer *chproto.Buffer, from int, end bool) (int, if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil { return 0, errors.Wrap(err, "compress") } - buffer.Buf = append(c.buffer.Buf[:from], c.compressor.Data...) + buffer.Buf = append(buffer.Buf[:from], c.compressor.Data...) } if err := c.flush(); err != nil { return 0, err From 7290aec37f68c07e83b9c53a6285b7c3c0c6a451 Mon Sep 17 00:00:00 2001 From: Dale Mcdiarmid Date: Thu, 3 Nov 2022 10:36:32 +0000 Subject: [PATCH 03/13] simple test --- benchmark/v2/read-native/basic_test.go | 18 +++++ benchmark/v2/write-native/write_test.go | 96 +++++++++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 benchmark/v2/write-native/write_test.go diff --git a/benchmark/v2/read-native/basic_test.go b/benchmark/v2/read-native/basic_test.go index 9005e3c3c6..9f0678ea6a 100644 --- a/benchmark/v2/read-native/basic_test.go +++ b/benchmark/v2/read-native/basic_test.go @@ -1,3 +1,20 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package main import ( @@ -32,6 +49,7 @@ func getConnection() clickhouse.Conn { } return conn } + func BenchmarkRead(b *testing.B) { b.Run("string", benchmarkStringRead) b.Run("random", benchmarkRandom) diff --git a/benchmark/v2/write-native/write_test.go b/benchmark/v2/write-native/write_test.go new file mode 100644 index 0000000000..d89f2fdf37 --- /dev/null +++ b/benchmark/v2/write-native/write_test.go @@ -0,0 +1,96 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "github.com/ClickHouse/clickhouse-go/v2" + "log" + "testing" + "time" +) + +func getConnection() clickhouse.Conn { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{"127.0.0.1:9000"}, + Auth: clickhouse.Auth{ + Database: "default", + Username: "default", + Password: "ClickHouse", + }, + //Debug: true, + DialTimeout: time.Second, + MaxOpenConns: 10, + MaxIdleConns: 5, + ConnMaxLifetime: time.Hour, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + BlockBufferSize: 100, + }) + if err != nil { + log.Fatal(err) + } + return conn +} + +func BenchmarkWrite(b *testing.B) { + b.Run("simple", benchmarkSimpleWrite) +} + +func benchmarkSimpleWrite(b *testing.B) { + conn := getConnection() + + if err := conn.Exec(context.Background(), "DROP TABLE IF EXISTS benchmark"); err != nil { + b.Fatal(err) + } + const ddl = ` + CREATE TABLE benchmark ( + Col1 UInt64 + , Col2 String + , Col3 Array(UInt8) + , Col4 DateTime + ) Engine Null + ` + + if err := conn.Exec(context.Background(), ddl); err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO benchmark") + if err != nil { + b.Fatal(err) + } + for c := 0; c < 1_000_000; c++ { + err := batch.Append( + uint64(i), + "Golang SQL database driver", + []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9}, + time.Now(), + ) + if err != nil { + b.Fatal(err) + } + } + + if err := batch.Send(); err != nil { + b.Fatal(err) + } + } +} From bf76e6d359600e9d831e8c471985ad592f0b03c0 Mon Sep 17 00:00:00 2001 From: Dale Mcdiarmid Date: Thu, 3 Nov 2022 12:12:44 +0000 Subject: [PATCH 04/13] Better implementation --- benchmark/v2/write-native/write_test.go | 2 +- conn.go | 46 +++++++++++++++++-------- conn_http.go | 23 ++++++------- lib/proto/block.go | 32 +++++++++++------ 4 files changed, 63 insertions(+), 40 deletions(-) diff --git a/benchmark/v2/write-native/write_test.go b/benchmark/v2/write-native/write_test.go index d89f2fdf37..c77f36d2ec 100644 --- a/benchmark/v2/write-native/write_test.go +++ b/benchmark/v2/write-native/write_test.go @@ -77,7 +77,7 @@ func benchmarkSimpleWrite(b *testing.B) { if err != nil { b.Fatal(err) } - for c := 0; c < 1_000_000; c++ { + for c := 0; c < 10_000_000; c++ { err := batch.Append( uint64(i), "Golang SQL database driver", diff --git a/conn.go b/conn.go index 2aa6429011..66c168cdfd 100644 --- a/conn.go +++ b/conn.go @@ -179,29 +179,45 @@ func (c *connect) exception() error { return &e } -func (c *connect) flushBuffer(buffer *chproto.Buffer, from int, end bool) (int, error) { - if len(buffer.Buf) > c.maxBufferBeforeCompression || (end && len(buffer.Buf) > 0) { - if c.compression != CompressionNone { - data := buffer.Buf[from:] - if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil { - return 0, errors.Wrap(err, "compress") - } - buffer.Buf = append(buffer.Buf[:from], c.compressor.Data...) - } - if err := c.flush(); err != nil { - return 0, err +func (c *connect) compressBuffer(start int) error { + if c.compression != CompressionNone && len(c.buffer.Buf) > 0 { + data := c.buffer.Buf[start:] + if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil { + return errors.Wrap(err, "compress") } - buffer.Reset() - return 0, nil + c.buffer.Buf = append(c.buffer.Buf[:start], c.compressor.Data...) } - return from, nil + return nil } func (c *connect) sendData(block *proto.Block, name string) error { c.debugf("[send data] compression=%t", c.compression) c.buffer.PutByte(proto.ClientData) c.buffer.PutString(name) - if err := block.Encode(c.buffer, c.flushBuffer, c.revision); err != nil { + // Saving offset of compressible data. + start := len(c.buffer.Buf) + if err := block.EncodeHeader(c.buffer, c.revision); err != nil { + return err + } + for i := range block.Columns { + if err := block.EncodeColumn(c.buffer, i); err != nil { + return err + } + if len(c.buffer.Buf) >= c.maxBufferBeforeCompression { + if err := c.compressBuffer(start); err != nil { + return err + } + if err := c.flush(); err != nil { + return err + } + start = 0 + c.buffer.Reset() + } + } + if err := c.compressBuffer(start); err != nil { + return err + } + if err := c.flush(); err != nil { return err } defer func() { diff --git a/conn_http.go b/conn_http.go index 7dd8bd2b0b..e547c05fb8 100644 --- a/conn_http.go +++ b/conn_http.go @@ -331,23 +331,20 @@ func createCompressionPool(compression *Compression) (Pool[HTTPReaderWriter], er return pool, nil } -func (h *httpConnect) flushBuffer(buffer *chproto.Buffer, from int, end bool) (int, error) { - if end && len(buffer.Buf) > 0 && (h.compression == CompressionLZ4 || h.compression == CompressionZSTD) { - data := buffer.Buf[from:] - if err := h.blockCompressor.Compress(compress.Method(h.compression), data); err != nil { - return 0, errors.Wrap(err, "compress") - } - buffer.Buf = append(buffer.Buf[:from], h.blockCompressor.Data...) - return len(buffer.Buf), nil - } - return from, nil -} - func (h *httpConnect) writeData(block *proto.Block) error { // Saving offset of compressible data - if err := block.Encode(h.buffer, h.flushBuffer, 0); err != nil { + start := len(h.buffer.Buf) + if err := block.Encode(h.buffer, 0); err != nil { return err } + if h.compression == CompressionLZ4 || h.compression == CompressionZSTD { + // Performing compression. Supported and requires + data := h.buffer.Buf[start:] + if err := h.blockCompressor.Compress(compress.Method(h.compression), data); err != nil { + return errors.Wrap(err, "compress") + } + h.buffer.Buf = append(h.buffer.Buf[:start], h.blockCompressor.Data...) + } return nil } diff --git a/lib/proto/block.go b/lib/proto/block.go index ede6c364f3..9015cd3685 100644 --- a/lib/proto/block.go +++ b/lib/proto/block.go @@ -117,10 +117,7 @@ func difference(a, b []string) []string { return diff } -type flusher func(buffer *proto.Buffer, from int, end bool) (int, error) - -func (b *Block) Encode(buffer *proto.Buffer, flush flusher, revision uint64) (err error) { - start := len(buffer.Buf) +func (b *Block) EncodeHeader(buffer *proto.Buffer, revision uint64) (err error) { if revision > 0 { encodeBlockInfo(buffer) } @@ -139,7 +136,12 @@ func (b *Block) Encode(buffer *proto.Buffer, flush flusher, revision uint64) (er } buffer.PutUVarInt(uint64(len(b.Columns))) buffer.PutUVarInt(uint64(rows)) - for _, c := range b.Columns { + return nil +} + +func (b *Block) EncodeColumn(buffer *proto.Buffer, i int) (err error) { + if i >= 0 && i < len(b.Columns) { + c := b.Columns[i] buffer.PutString(c.Name()) buffer.PutString(string(c.Type())) if serialize, ok := c.(column.CustomSerialization); ok { @@ -152,15 +154,23 @@ func (b *Block) Encode(buffer *proto.Buffer, flush flusher, revision uint64) (er } } c.Encode(buffer) - // invoke flush on each column - if start, err = flush(buffer, start, false); err != nil { - return err - } + return nil } - // flush at end - indicate no more data - if start, err = flush(buffer, start, true); err != nil { + return &BlockError{ + Op: "Encode", + Err: fmt.Errorf("%d is out of range of %d columns", i, len(b.Columns)), + } +} + +func (b *Block) Encode(buffer *proto.Buffer, revision uint64) (err error) { + if err := b.EncodeHeader(buffer, revision); err != nil { return err } + for i := range b.Columns { + if err := b.EncodeColumn(buffer, i); err != nil { + return err + } + } return nil } From 68d68696c57e08ccffeb75d07693523d1bfe20aa Mon Sep 17 00:00:00 2001 From: Dale Mcdiarmid Date: Mon, 7 Nov 2022 09:43:15 +0000 Subject: [PATCH 05/13] rename --- clickhouse_options.go | 34 ++++++++++++------------ conn.go | 60 +++++++++++++++++++++---------------------- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/clickhouse_options.go b/clickhouse_options.go index 4f992c7462..e194928dd9 100644 --- a/clickhouse_options.go +++ b/clickhouse_options.go @@ -116,21 +116,21 @@ func ParseDSN(dsn string) (*Options, error) { type Options struct { Protocol Protocol - TLS *tls.Config - Addr []string - Auth Auth - DialContext func(ctx context.Context, addr string) (net.Conn, error) - Debug bool - Debugf func(format string, v ...interface{}) // only works when Debug is true - Settings Settings - Compression *Compression - DialTimeout time.Duration // default 1 second - MaxOpenConns int // default MaxIdleConns + 5 - MaxIdleConns int // default 5 - ConnMaxLifetime time.Duration // default 1 hour - ConnOpenStrategy ConnOpenStrategy - BlockBufferSize uint8 // default 2 - can be overwritten on query - MaxBufferBeforeCompression int // default 1048576 - measured in bytes i.e. 1mb + TLS *tls.Config + Addr []string + Auth Auth + DialContext func(ctx context.Context, addr string) (net.Conn, error) + Debug bool + Debugf func(format string, v ...interface{}) // only works when Debug is true + Settings Settings + Compression *Compression + DialTimeout time.Duration // default 1 second + MaxOpenConns int // default MaxIdleConns + 5 + MaxIdleConns int // default 5 + ConnMaxLifetime time.Duration // default 1 hour + ConnOpenStrategy ConnOpenStrategy + BlockBufferSize uint8 // default 2 - can be overwritten on query + MaxCompressionBuffer int // default 1048576 - measured in bytes i.e. 1mb scheme string ReadTimeout time.Duration @@ -291,8 +291,8 @@ func (o Options) setDefaults() *Options { if o.BlockBufferSize <= 0 { o.BlockBufferSize = 2 } - if o.MaxBufferBeforeCompression <= 0 { - o.MaxBufferBeforeCompression = 1048576 + if o.MaxCompressionBuffer <= 0 { + o.MaxCompressionBuffer = 1048576 } if o.Addr == nil || len(o.Addr) == 0 { switch o.Protocol { diff --git a/conn.go b/conn.go index 66c168cdfd..9dd07cfaec 100644 --- a/conn.go +++ b/conn.go @@ -72,19 +72,19 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er var ( connect = &connect{ - opt: opt, - conn: conn, - debugf: debugf, - buffer: new(chproto.Buffer), - reader: chproto.NewReader(conn), - revision: proto.ClientTCPProtocolVersion, - structMap: &structMap{}, - compression: compression, - connectedAt: time.Now(), - compressor: compress.NewWriter(), - readTimeout: opt.ReadTimeout, - blockBufferSize: opt.BlockBufferSize, - maxBufferBeforeCompression: opt.MaxBufferBeforeCompression, + opt: opt, + conn: conn, + debugf: debugf, + buffer: new(chproto.Buffer), + reader: chproto.NewReader(conn), + revision: proto.ClientTCPProtocolVersion, + structMap: &structMap{}, + compression: compression, + connectedAt: time.Now(), + compressor: compress.NewWriter(), + readTimeout: opt.ReadTimeout, + blockBufferSize: opt.BlockBufferSize, + maxCompressionBuffer: opt.MaxCompressionBuffer, } ) if err := connect.handshake(opt.Auth.Database, opt.Auth.Username, opt.Auth.Password); err != nil { @@ -102,22 +102,22 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er // https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp type connect struct { - opt *Options - conn net.Conn - debugf func(format string, v ...interface{}) - server ServerVersion - closed bool - buffer *chproto.Buffer - reader *chproto.Reader - released bool - revision uint64 - structMap *structMap - compression CompressionMethod - connectedAt time.Time - compressor *compress.Writer - readTimeout time.Duration - blockBufferSize uint8 - maxBufferBeforeCompression int + opt *Options + conn net.Conn + debugf func(format string, v ...interface{}) + server ServerVersion + closed bool + buffer *chproto.Buffer + reader *chproto.Reader + released bool + revision uint64 + structMap *structMap + compression CompressionMethod + connectedAt time.Time + compressor *compress.Writer + readTimeout time.Duration + blockBufferSize uint8 + maxCompressionBuffer int } func (c *connect) settings(querySettings Settings) []proto.Setting { @@ -203,7 +203,7 @@ func (c *connect) sendData(block *proto.Block, name string) error { if err := block.EncodeColumn(c.buffer, i); err != nil { return err } - if len(c.buffer.Buf) >= c.maxBufferBeforeCompression { + if len(c.buffer.Buf) >= c.maxCompressionBuffer { if err := c.compressBuffer(start); err != nil { return err } From bd6cca0ad05e496be2f32195521a4f87d2a5d1df Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Mon, 12 Dec 2022 23:22:20 +0100 Subject: [PATCH 06/13] do not overflow compression buffer --- conn.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/conn.go b/conn.go index 9dd07cfaec..fda28186fd 100644 --- a/conn.go +++ b/conn.go @@ -194,8 +194,9 @@ func (c *connect) sendData(block *proto.Block, name string) error { c.debugf("[send data] compression=%t", c.compression) c.buffer.PutByte(proto.ClientData) c.buffer.PutString(name) - // Saving offset of compressible data. - start := len(c.buffer.Buf) + + compressionOffset := len(c.buffer.Buf) + if err := block.EncodeHeader(c.buffer, c.revision); err != nil { return err } @@ -204,17 +205,20 @@ func (c *connect) sendData(block *proto.Block, name string) error { return err } if len(c.buffer.Buf) >= c.maxCompressionBuffer { - if err := c.compressBuffer(start); err != nil { + bufOverflow := c.buffer.Buf[c.maxCompressionBuffer:] + c.buffer.Buf = c.buffer.Buf[:c.maxCompressionBuffer] + + if err := c.compressBuffer(compressionOffset); err != nil { return err } if err := c.flush(); err != nil { return err } - start = 0 - c.buffer.Reset() + compressionOffset = 0 + c.buffer.Buf = bufOverflow } } - if err := c.compressBuffer(start); err != nil { + if err := c.compressBuffer(compressionOffset); err != nil { return err } if err := c.flush(); err != nil { From b6674c00371520a68de87f01a618804e058e000f Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Mon, 12 Dec 2022 23:48:13 +0100 Subject: [PATCH 07/13] benchmark --- .../write-compress-buffer-limit/write_test.go | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 benchmark/v2/write-compress-buffer-limit/write_test.go diff --git a/benchmark/v2/write-compress-buffer-limit/write_test.go b/benchmark/v2/write-compress-buffer-limit/write_test.go new file mode 100644 index 0000000000..48de65eff0 --- /dev/null +++ b/benchmark/v2/write-compress-buffer-limit/write_test.go @@ -0,0 +1,125 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "github.com/ClickHouse/clickhouse-go/v2" + "log" + "testing" + "time" +) + +func getConnection(maxCompressionBuffer int) clickhouse.Conn { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{"127.0.0.1:9000"}, + Auth: clickhouse.Auth{ + Database: "default", + Username: "default", + //Password: "ClickHouse", + }, + //Debug: true, + DialTimeout: time.Second, + MaxOpenConns: 10, + MaxIdleConns: 5, + ConnMaxLifetime: time.Hour, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + BlockBufferSize: 100, + MaxCompressionBuffer: maxCompressionBuffer, + }) + if err != nil { + log.Fatal(err) + } + return conn +} + +func BenchmarkWrite1KB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024) +} + +func BenchmarkWrite16KB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024*16) +} + +func BenchmarkWrite64KB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024*64) +} + +func BenchmarkWrite256KB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024*256) +} + +func BenchmarkWrite512KB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024*512) +} + +func BenchmarkWrite1MB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024*1024) +} + +func BenchmarkWrite5MB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024*1024*5) +} + +func BenchmarkWrite10MB(b *testing.B) { + benchmarkCompressionBufferLimitedWrite(b, 1024*1024*10) +} + +func benchmarkCompressionBufferLimitedWrite(b *testing.B, maxCompressionBuffer int) { + conn := getConnection(maxCompressionBuffer) + + if err := conn.Exec(context.Background(), "DROP TABLE IF EXISTS benchmark"); err != nil { + b.Fatal(err) + } + const ddl = ` + CREATE TABLE benchmark ( + Col1 UInt64 + , Col2 String + , Col3 Array(UInt8) + , Col4 DateTime + ) Engine Null + ` + + if err := conn.Exec(context.Background(), ddl); err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO benchmark") + if err != nil { + b.Fatal(err) + } + for c := 0; c < 10_000_000; c++ { + err := batch.Append( + uint64(i), + "Golang SQL database driver", + []uint8{1, 2, 3, 4, 5, 6, 7, 8, 9}, + time.Now(), + ) + if err != nil { + b.Fatal(err) + } + } + + if err := batch.Send(); err != nil { + b.Fatal(err) + } + } +} From 9904808a72ab32f2de4a5e42d23d67ec545ce718 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 13 Dec 2022 12:38:04 +0100 Subject: [PATCH 08/13] print mem usage --- .../write-compress-buffer-limit/write_test.go | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/benchmark/v2/write-compress-buffer-limit/write_test.go b/benchmark/v2/write-compress-buffer-limit/write_test.go index 48de65eff0..39e08b70f0 100644 --- a/benchmark/v2/write-compress-buffer-limit/write_test.go +++ b/benchmark/v2/write-compress-buffer-limit/write_test.go @@ -19,12 +19,27 @@ package main import ( "context" + "fmt" "github.com/ClickHouse/clickhouse-go/v2" "log" + "runtime" "testing" "time" ) +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} + +func PrintMemUsage() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v\n", m.NumGC) +} + func getConnection(maxCompressionBuffer int) clickhouse.Conn { conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{"127.0.0.1:9000"}, @@ -83,6 +98,15 @@ func BenchmarkWrite10MB(b *testing.B) { } func benchmarkCompressionBufferLimitedWrite(b *testing.B, maxCompressionBuffer int) { + fmt.Sprintf("max compression buffer= %dB", maxCompressionBuffer) + + go func() { + for { + PrintMemUsage() + time.Sleep(time.Second) + } + }() + conn := getConnection(maxCompressionBuffer) if err := conn.Exec(context.Background(), "DROP TABLE IF EXISTS benchmark"); err != nil { From e6f97fdb74358301190cf8a5f2761ee9948932e6 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 13 Dec 2022 12:52:50 +0100 Subject: [PATCH 09/13] remove buffer compression overflow --- conn.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/conn.go b/conn.go index fda28186fd..96f9778f2a 100644 --- a/conn.go +++ b/conn.go @@ -205,9 +205,6 @@ func (c *connect) sendData(block *proto.Block, name string) error { return err } if len(c.buffer.Buf) >= c.maxCompressionBuffer { - bufOverflow := c.buffer.Buf[c.maxCompressionBuffer:] - c.buffer.Buf = c.buffer.Buf[:c.maxCompressionBuffer] - if err := c.compressBuffer(compressionOffset); err != nil { return err } @@ -215,7 +212,6 @@ func (c *connect) sendData(block *proto.Block, name string) error { return err } compressionOffset = 0 - c.buffer.Buf = bufOverflow } } if err := c.compressBuffer(compressionOffset); err != nil { From c5c9499323ec32f5e407909b3035b500d0b03cd2 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 13 Dec 2022 13:03:50 +0100 Subject: [PATCH 10/13] add buffor size debugf --- conn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/conn.go b/conn.go index 96f9778f2a..fea337b42f 100644 --- a/conn.go +++ b/conn.go @@ -208,6 +208,7 @@ func (c *connect) sendData(block *proto.Block, name string) error { if err := c.compressBuffer(compressionOffset); err != nil { return err } + c.debugf("[buff compress] buffer size: %d", len(c.buffer.Buf)) if err := c.flush(); err != nil { return err } From 956f096f9bdd173081538757f22b3f0460e62da3 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Fri, 16 Dec 2022 20:50:57 +0100 Subject: [PATCH 11/13] add DSN tests and README --- README.md | 5 +- clickhouse_options.go | 16 +- clickhouse_options_test.go | 318 +++++++++++++++++++++++++++++++++++++ 3 files changed, 336 insertions(+), 3 deletions(-) create mode 100644 clickhouse_options_test.go diff --git a/README.md b/README.md index 4da591e25b..8295b3693f 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ Support for the ClickHouse protocol advanced features using `Context`: ConnMaxLifetime: time.Duration(10) * time.Minute, ConnOpenStrategy: clickhouse.ConnOpenInOrder, BlockBufferSize: 10, + MaxCompressionBuffer: 10240, }) if err != nil { return err @@ -117,6 +118,7 @@ conn := clickhouse.OpenDB(&clickhouse.Options{ }, Debug: true, BlockBufferSize: 10, + MaxCompressionBuffer: 10240, }) conn.SetMaxIdleConns(5) conn.SetMaxOpenConns(10) @@ -141,6 +143,7 @@ conn.SetConnMaxLifetime(time.Hour) - `zstd`, `lz4` - ignored * block_buffer_size - size of block buffer (default 2) * read_timeout - a duration string is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix such as "300ms", "1s". Valid time units are "ms", "s", "m" (default 5m). +* max_compression_buffer - max size (bytes) of compression buffer during column by column compression (default 10MiB) SSL/TLS parameters: @@ -186,7 +189,7 @@ conn := clickhouse.OpenDB(&clickhouse.Options{ ## Compression -ZSTD/LZ4 compression is supported over native and http protocols. This is performed at a block level and is only used for inserts. +ZSTD/LZ4 compression is supported over native and http protocols. This is performed column by column at a block level and is only used for inserts. Compression buffer size is set as `MaxCompressionBuffer` option. If using `Open` via the std interface and specifying a DSN, compression can be enabled via the `compress` flag. Currently, this is a boolean flag which enables `LZ4` compression. diff --git a/clickhouse_options.go b/clickhouse_options.go index aa9cdcc0e9..272c3c99cd 100644 --- a/clickhouse_options.go +++ b/clickhouse_options.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "fmt" "github.com/ClickHouse/ch-go/compress" + "github.com/pkg/errors" "net" "net/url" "strconv" @@ -131,7 +132,7 @@ type Options struct { ConnOpenStrategy ConnOpenStrategy HttpHeaders map[string]string // set additional headers on HTTP requests BlockBufferSize uint8 // default 2 - can be overwritten on query - MaxCompressionBuffer int // default 1048576 - measured in bytes i.e. 1mb + MaxCompressionBuffer int // default 10485760 - measured in bytes i.e. 10MiB scheme string ReadTimeout time.Duration @@ -142,6 +143,11 @@ func (o *Options) fromDSN(in string) error { if err != nil { return err } + + if dsn.Host == "" { + return errors.New("parse dsn address failed") + } + if o.Settings == nil { o.Settings = make(Settings) } @@ -191,6 +197,12 @@ func (o *Options) fromDSN(in string) error { } else { return err } + case "max_compression_buffer": + max, err := strconv.Atoi(params.Get(v)) + if err != nil { + return errors.Wrap(err, "max_compression_buffer invalid value") + } + o.MaxCompressionBuffer = max case "dial_timeout": duration, err := time.ParseDuration(params.Get(v)) if err != nil { @@ -293,7 +305,7 @@ func (o Options) setDefaults() *Options { o.BlockBufferSize = 2 } if o.MaxCompressionBuffer <= 0 { - o.MaxCompressionBuffer = 1048576 + o.MaxCompressionBuffer = 10485760 } if o.Addr == nil || len(o.Addr) == 0 { switch o.Protocol { diff --git a/clickhouse_options_test.go b/clickhouse_options_test.go new file mode 100644 index 0000000000..0c51e49357 --- /dev/null +++ b/clickhouse_options_test.go @@ -0,0 +1,318 @@ +package clickhouse + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +// TestParseDSN does not implement all use cases yet +func TestParseDSN(t *testing.T) { + testCases := []struct { + name string + dsn string + expected *Options + expectedErr string + }{ + { + "empty dsn", + "", + nil, + "parse dsn address failed", + }, + { + "no host", + "/test_database", + nil, + "parse dsn address failed", + }, + { + "no protocol", + "127.0.0.1/test_database", + nil, + "parse dsn address failed", + }, + { + "native protocol", + "clickhouse://127.0.0.1/test_database", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "http protocol", + "http://127.0.0.1/test_database", + &Options{ + Protocol: HTTP, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Auth: Auth{ + Database: "test_database", + }, + scheme: "http", + }, + "", + }, + { + "native protocol with user", + "clickhouse://user@127.0.0.1/test_database", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Auth: Auth{ + Database: "test_database", + Username: "user", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with authenticated user", + "clickhouse://joe:Ys31@127.0.0.1/test_database", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Auth: Auth{ + Database: "test_database", + Username: "joe", + Password: "Ys31", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with debug", + "clickhouse://127.0.0.1/test_database?debug=true", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Auth: Auth{ + Database: "test_database", + }, + Debug: true, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with default lz4 compression", + "clickhouse://127.0.0.1/test_database?compress=true", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionLZ4, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with none compression", + "clickhouse://127.0.0.1/test_database?compress=none", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionNone, + Level: 3, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with zstd compression", + "clickhouse://127.0.0.1/test_database?compress=zstd", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionZSTD, + Level: 3, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with lz4 compression", + "clickhouse://127.0.0.1/test_database?compress=lz4", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionLZ4, + Level: 3, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with gzip compression", + "clickhouse://127.0.0.1/test_database?compress=gzip", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionGZIP, + Level: 3, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with deflate compression", + "clickhouse://127.0.0.1/test_database?compress=deflate", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionDeflate, + Level: 3, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with br compression", + "clickhouse://127.0.0.1/test_database?compress=br", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionBrotli, + Level: 3, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with default lz4 compression and compression level 5", + "clickhouse://127.0.0.1/test_database?compress=true&compress_level=5", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + Compression: &Compression{ + Method: CompressionLZ4, + Level: 5, + }, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with 1KiB max compression buffer", + "clickhouse://127.0.0.1/test_database?max_compression_buffer=1024", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + MaxCompressionBuffer: 1024, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + { + "native protocol with invalid numeric max compression buffer", + "clickhouse://127.0.0.1/test_database?max_compression_buffer=onebyte", + nil, + "max_compression_buffer invalid value: strconv.Atoi: parsing \"onebyte\": invalid syntax", + }, + { + "native protocol dial timeout", + "clickhouse://127.0.0.1/test_database?max_compression_buffer=1024", + &Options{ + Protocol: Native, + TLS: nil, + Addr: []string{"127.0.0.1"}, + Settings: Settings{}, + MaxCompressionBuffer: 1024, + Auth: Auth{ + Database: "test_database", + }, + scheme: "clickhouse", + }, + "", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + opts, err := ParseDSN(testCase.dsn) + + if testCase.expectedErr != "" { + assert.Nil(t, opts) + assert.Error(t, err, testCase.expectedErr) + return + } + + assert.Equal(t, testCase.expected, opts) + assert.Nil(t, err) + }) + } +} From a89e197fbe25aa1e21c2adc9b3003ac49389ed1a Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Fri, 16 Dec 2022 21:49:21 +0100 Subject: [PATCH 12/13] fix non trivial DSN parse issue --- clickhouse_options.go | 36 ++++++++++++++++++++---------------- clickhouse_options_test.go | 8 +++++++- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/clickhouse_options.go b/clickhouse_options.go index 272c3c99cd..7537257d3b 100644 --- a/clickhouse_options.go +++ b/clickhouse_options.go @@ -168,35 +168,39 @@ func (o *Options) fromDSN(in string) error { o.Debug, _ = strconv.ParseBool(params.Get(v)) case "compress": if on, _ := strconv.ParseBool(params.Get(v)); on { - o.Compression = &Compression{ - Method: CompressionLZ4, + if o.Compression == nil { + o.Compression = &Compression{} } + + o.Compression.Method = CompressionLZ4 + continue } if compressMethod, ok := compressionMap[params.Get(v)]; ok { if o.Compression == nil { o.Compression = &Compression{ - Method: compressMethod, // default for now same as Clickhouse - https://clickhouse.com/docs/en/operations/settings/settings#settings-http_zlib_compression_level Level: 3, } - } else { - o.Compression.Method = compressMethod } + + o.Compression.Method = compressMethod } case "compress_level": - if level, err := strconv.ParseInt(params.Get(v), 10, 8); err == nil { - if o.Compression == nil { - o.Compression = &Compression{ - // a level alone doesn't enable compression - Method: CompressionNone, - Level: int(level), - } - } else { - o.Compression.Level = int(level) + level, err := strconv.ParseInt(params.Get(v), 10, 8) + if err != nil { + return errors.Wrap(err, "compress_level invalid value") + } + + if o.Compression == nil { + o.Compression = &Compression{ + // a level alone doesn't enable compression + Method: CompressionNone, + Level: int(level), } - } else { - return err + continue } + + o.Compression.Level = int(level) case "max_compression_buffer": max, err := strconv.Atoi(params.Get(v)) if err != nil { diff --git a/clickhouse_options_test.go b/clickhouse_options_test.go index 0c51e49357..34b0a4dd40 100644 --- a/clickhouse_options_test.go +++ b/clickhouse_options_test.go @@ -283,6 +283,12 @@ func TestParseDSN(t *testing.T) { nil, "max_compression_buffer invalid value: strconv.Atoi: parsing \"onebyte\": invalid syntax", }, + { + "native protocol with invalid numeric compress level", + "clickhouse://127.0.0.1/test_database?compress_level=first", + nil, + "compress_level invalid value: strconv.ParseInt: parsing \"first\": invalid syntax", + }, { "native protocol dial timeout", "clickhouse://127.0.0.1/test_database?max_compression_buffer=1024", @@ -307,7 +313,7 @@ func TestParseDSN(t *testing.T) { if testCase.expectedErr != "" { assert.Nil(t, opts) - assert.Error(t, err, testCase.expectedErr) + assert.EqualError(t, err, testCase.expectedErr) return } From e065e76199f6c9b69a1ea5cf08ca4451cc7a232b Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Fri, 16 Dec 2022 21:50:10 +0100 Subject: [PATCH 13/13] license --- clickhouse_options_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/clickhouse_options_test.go b/clickhouse_options_test.go index 34b0a4dd40..3c65d0deed 100644 --- a/clickhouse_options_test.go +++ b/clickhouse_options_test.go @@ -1,3 +1,20 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package clickhouse import (