Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered compression column by column for native protocol #808

Merged
merged 16 commits into from
Dec 16, 2022
Merged
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Support for the ClickHouse protocol advanced features using `Context`:
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
MaxCompressionBuffer: 10240,
})
if err != nil {
return err
Expand Down Expand Up @@ -117,6 +118,7 @@ conn := clickhouse.OpenDB(&clickhouse.Options{
},
Debug: true,
BlockBufferSize: 10,
MaxCompressionBuffer: 10240,
})
conn.SetMaxIdleConns(5)
conn.SetMaxOpenConns(10)
Expand All @@ -141,6 +143,7 @@ conn.SetConnMaxLifetime(time.Hour)
- `zstd`, `lz4` - ignored
* block_buffer_size - size of block buffer (default 2)
* read_timeout - a duration string is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix such as "300ms", "1s". Valid time units are "ms", "s", "m" (default 5m).
* max_compression_buffer - max size (bytes) of compression buffer during column by column compression (default 10MiB)

SSL/TLS parameters:

Expand Down Expand Up @@ -186,7 +189,7 @@ conn := clickhouse.OpenDB(&clickhouse.Options{

## Compression

ZSTD/LZ4 compression is supported over native and http protocols. This is performed at a block level and is only used for inserts.
ZSTD/LZ4 compression is supported over native and http protocols. This is performed column by column at a block level and is only used for inserts. Compression buffer size is set as `MaxCompressionBuffer` option.

If using `Open` via the std interface and specifying a DSN, compression can be enabled via the `compress` flag. Currently, this is a boolean flag which enables `LZ4` compression.

Expand Down
18 changes: 18 additions & 0 deletions benchmark/v2/read-native/basic_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
Expand Down Expand Up @@ -32,6 +49,7 @@ func getConnection() clickhouse.Conn {
}
return conn
}

func BenchmarkRead(b *testing.B) {
b.Run("string", benchmarkStringRead)
b.Run("random", benchmarkRandom)
Expand Down
149 changes: 149 additions & 0 deletions benchmark/v2/write-compress-buffer-limit/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"log"
"runtime"
"testing"
"time"
)

func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

func PrintMemUsage() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
fmt.Printf("\tNumGC = %v\n", m.NumGC)
}

func getConnection(maxCompressionBuffer int) clickhouse.Conn {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9000"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
//Password: "ClickHouse",
},
//Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
BlockBufferSize: 100,
MaxCompressionBuffer: maxCompressionBuffer,
})
if err != nil {
log.Fatal(err)
}
return conn
}

func BenchmarkWrite1KB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024)
}

func BenchmarkWrite16KB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024*16)
}

func BenchmarkWrite64KB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024*64)
}

func BenchmarkWrite256KB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024*256)
}

func BenchmarkWrite512KB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024*512)
}

func BenchmarkWrite1MB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024*1024)
}

func BenchmarkWrite5MB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024*1024*5)
}

func BenchmarkWrite10MB(b *testing.B) {
benchmarkCompressionBufferLimitedWrite(b, 1024*1024*10)
}

func benchmarkCompressionBufferLimitedWrite(b *testing.B, maxCompressionBuffer int) {
fmt.Sprintf("max compression buffer= %dB", maxCompressionBuffer)

go func() {
for {
PrintMemUsage()
time.Sleep(time.Second)
}
}()

conn := getConnection(maxCompressionBuffer)

if err := conn.Exec(context.Background(), "DROP TABLE IF EXISTS benchmark"); err != nil {
b.Fatal(err)
}
const ddl = `
CREATE TABLE benchmark (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) Engine Null
`

if err := conn.Exec(context.Background(), ddl); err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO benchmark")
if err != nil {
b.Fatal(err)
}
for c := 0; c < 10_000_000; c++ {
err := batch.Append(
uint64(i),
"Golang SQL database driver",
[]uint8{1, 2, 3, 4, 5, 6, 7, 8, 9},
time.Now(),
)
if err != nil {
b.Fatal(err)
}
}

if err := batch.Send(); err != nil {
b.Fatal(err)
}
}
}
46 changes: 31 additions & 15 deletions clickhouse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/tls"
"fmt"
"github.com/ClickHouse/ch-go/compress"
"github.com/pkg/errors"
"net"
"net/url"
"strconv"
Expand Down Expand Up @@ -116,21 +117,22 @@ func ParseDSN(dsn string) (*Options, error) {
type Options struct {
Protocol Protocol

TLS *tls.Config
Addr []string
Auth Auth
DialContext func(ctx context.Context, addr string) (net.Conn, error)
Debug bool
Debugf func(format string, v ...interface{}) // only works when Debug is true
Settings Settings
Compression *Compression
DialTimeout time.Duration // default 1 second
MaxOpenConns int // default MaxIdleConns + 5
MaxIdleConns int // default 5
ConnMaxLifetime time.Duration // default 1 hour
ConnOpenStrategy ConnOpenStrategy
HttpHeaders map[string]string // set additional headers on HTTP requests
BlockBufferSize uint8 // default 2 - can be overwritten on query
TLS *tls.Config
Addr []string
Auth Auth
DialContext func(ctx context.Context, addr string) (net.Conn, error)
Debug bool
Debugf func(format string, v ...interface{}) // only works when Debug is true
Settings Settings
Compression *Compression
DialTimeout time.Duration // default 1 second
MaxOpenConns int // default MaxIdleConns + 5
MaxIdleConns int // default 5
ConnMaxLifetime time.Duration // default 1 hour
ConnOpenStrategy ConnOpenStrategy
HttpHeaders map[string]string // set additional headers on HTTP requests
BlockBufferSize uint8 // default 2 - can be overwritten on query
MaxCompressionBuffer int // default 10485760 - measured in bytes i.e. 10MiB

scheme string
ReadTimeout time.Duration
Expand All @@ -141,6 +143,11 @@ func (o *Options) fromDSN(in string) error {
if err != nil {
return err
}

if dsn.Host == "" {
return errors.New("parse dsn address failed")
}

if o.Settings == nil {
o.Settings = make(Settings)
}
Expand Down Expand Up @@ -190,6 +197,12 @@ func (o *Options) fromDSN(in string) error {
} else {
return err
}
case "max_compression_buffer":
max, err := strconv.Atoi(params.Get(v))
if err != nil {
return errors.Wrap(err, "max_compression_buffer invalid value")
}
o.MaxCompressionBuffer = max
case "dial_timeout":
duration, err := time.ParseDuration(params.Get(v))
if err != nil {
Expand Down Expand Up @@ -291,6 +304,9 @@ func (o Options) setDefaults() *Options {
if o.BlockBufferSize <= 0 {
o.BlockBufferSize = 2
}
if o.MaxCompressionBuffer <= 0 {
o.MaxCompressionBuffer = 10485760
}
if o.Addr == nil || len(o.Addr) == 0 {
switch o.Protocol {
case Native:
Expand Down
Loading