Skip to content

Commit

Permalink
removed MinBufferSize config
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed May 17, 2021
1 parent 33d0c44 commit 1208a4b
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 28 deletions.
17 changes: 8 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ type TaskConfig struct {

FlushInterval int `json:"flushInterval,omitempty"`
BufferSize int `json:"bufferSize,omitempty"`
MinBufferSize int `json:"minBufferSize,omitempty"`
MsgSizeHint int `json:"msgSizeHint,omitempty"`
TimeZone string `json:"timezone"`
}

const (
defaultFlushInterval = 3
defaultBufferSize = 1 << 20 //1048576
defaultMinBufferSize = 1 << 14 // 16384
maxFlushInterval = 10
defaultFlushInterval = 5
MaxBufferSize = 1 << 20 //1048576
defaultBufferSize = 1 << 18 //262144
defaultMsgSizeHint = 1000
defaultTimeZone = "Local"
defaultLogLevel = "info"
Expand Down Expand Up @@ -214,17 +214,16 @@ func (cfg *Config) Normallize() (err error) {

if cfg.Task.FlushInterval <= 0 {
cfg.Task.FlushInterval = defaultFlushInterval
} else if cfg.Task.FlushInterval > maxFlushInterval {
cfg.Task.FlushInterval = maxFlushInterval
}
if cfg.Task.BufferSize <= 0 {
cfg.Task.BufferSize = defaultBufferSize
} else if cfg.Task.BufferSize > MaxBufferSize {
cfg.Task.BufferSize = MaxBufferSize
} else {
cfg.Task.BufferSize = 1 << util.GetShift(cfg.Task.BufferSize)
}
if cfg.Task.MinBufferSize <= 0 {
cfg.Task.MinBufferSize = defaultMinBufferSize
} else {
cfg.Task.MinBufferSize = 1 << util.GetShift(cfg.Task.MinBufferSize)
}
if cfg.Task.MsgSizeHint <= 0 {
cfg.Task.MsgSizeHint = defaultMsgSizeHint
}
Expand Down
12 changes: 5 additions & 7 deletions docs/configuration/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,23 +121,21 @@
// shardingPolicy is `stripe,<interval>`(requires ShardingKey be numerical) or `hash`(requires ShardingKey be string)
"shardingPolicy": "",
// interval of flushing the batch
// interval of flushing the batch. Default to 5, max to 10.
"flushInterval": 5,
// batch size to insert into clickhouse. sinker will round upward it to the the nearest 2^n.
// batch size to insert into clickhouse. sinker will round upward it to the the nearest 2^n. Default to 262114, max to 1048576.
"bufferSize": 90000,
// min batch size to insert into clickhouse. sinker will round upward it to the the nearest 2^n.
"minBufferSize": 1,
// estimated avg message size. kafka-go needs this to determize receive buffer size. default to 1000.
// estimated avg message size. kafka-go needs this to determize receive buffer size. Default to 1000.
"msgSizeHint": 1000,
// In the absence of time zone information, interprets the time as in the given location. Default to "Local" (aka /etc/localtime of the machine on which sinker runs)
"timezone": ""
},
// log level, possible value: "debug", "info", "warn", "error", "dpanic", "panic", "fatal"
// log level, possible value: "debug", "info", "warn", "error", "dpanic", "panic", "fatal". Default to "info".
"logLevel": "debug",
// log output paths, possible value: "stdout", "stderr", relative file path, absoute file path. Log files will be rotated every 100MB, keep 10 old ones.
// log output paths, possible value: "stdout", "stderr", relative file path, absoute file path. Log files will be rotated every 100MB, keep 10 old ones. Default to ["stdout"].
"logPaths": ["stdout", "test_dynamic_schema.log"]
}
```
4 changes: 2 additions & 2 deletions go.test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_fixed_schema.json
timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.json
timeout 60 ./clickhouse_sinker --local-cfg-file docker/test_dynamic_schema.json

echo "check result"
echo "check result 1"
count=`curl "localhost:8123" -d 'select count() from test_fixed_schema'`
echo "Got test_fixed_schema count => $count"
[ $count -eq 100000 ] || exit 1
Expand Down Expand Up @@ -88,7 +88,7 @@ timeout 30 ./clickhouse_sinker --nacos-addr 127.0.0.1:8848 --nacos-username naco
timeout 30 ./clickhouse_sinker --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_auto_schema
timeout 30 ./clickhouse_sinker --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_dynamic_schema

echo "check result"
echo "check result 2"
count=`curl "localhost:8123" -d 'select count() from test_fixed_schema'`
echo "Got test_fixed_schema count => $count"
[ $count -eq 100000 ] || exit 1
Expand Down
4 changes: 2 additions & 2 deletions input/kafka_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (k *KafkaGo) Init(cfg *config.Config, taskName string, putFn func(msg model
GroupID: k.cfg.Task.ConsumerGroup,
Topic: k.cfg.Task.Topic,
StartOffset: offset,
MinBytes: k.cfg.Task.MinBufferSize * k.cfg.Task.MsgSizeHint,
MinBytes: (k.cfg.Task.BufferSize / 2) * k.cfg.Task.MsgSizeHint,
MaxBytes: k.cfg.Task.BufferSize * k.cfg.Task.MsgSizeHint,
MaxWait: time.Duration(k.cfg.Task.FlushInterval) * time.Second,
MaxWait: time.Duration(3) * time.Second,
CommitInterval: time.Second, // flushes commits to Kafka every second
}
if kfkCfg.TLS.CaCertFiles == "" && kfkCfg.TLS.TrustStoreLocation != "" {
Expand Down
2 changes: 1 addition & 1 deletion input/kafka_sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (k *KafkaSarama) Init(cfg *config.Config, taskName string, putFn func(msg m
if taskCfg.Earliest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
config.ChannelBufferSize = taskCfg.MinBufferSize
config.ChannelBufferSize = 1024
cg, err := sarama.NewConsumerGroup(strings.Split(kfkCfg.Brokers, ","), taskCfg.ConsumerGroup, config)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,9 @@ func TestParseInt(t *testing.T) {
uvAct = uint32(uv)
case 64:
ivExp = i64Exp[i]
ivAct = int64(iv)
ivAct = iv
uvExp = u64Exp[i]
uvAct = uint64(uv)
uvAct = uv
}
desc = fmt.Sprintf(`ParseInt("%s", 10, %d)=%d(%v)`, s, bitSize, iv, errors.Unwrap(ivErr))
require.Equal(t, ivExp, ivAct, desc)
Expand Down
7 changes: 2 additions & 5 deletions pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ import (
"sync"
"time"

"github.com/housepower/clickhouse_sinker/config"
"github.com/housepower/clickhouse_sinker/health"
"github.com/housepower/clickhouse_sinker/util"
"github.com/pkg/errors"
"github.com/troian/healthcheck"
"go.uber.org/zap"
)

const (
BlockSize = 1 << 21 //2097152, two times of the default value
)

var (
lock sync.Mutex
clusterConn []*ShardConn
Expand Down Expand Up @@ -125,7 +122,7 @@ func InitClusterConn(hosts [][]string, port int, db, username, password, dsnPara
// Each shard has a *sql.DB which connects to one replica inside the shard.
// "alt_hosts" tolerates replica single-point-failure. However more flexable switching is needed for some cases for example https://github.com/ClickHouse/ClickHouse/issues/24036.
dsnTmpl = "tcp://%s" + fmt.Sprintf("?database=%s&username=%s&password=%s&block_size=%d",
url.QueryEscape(db), url.QueryEscape(username), url.QueryEscape(password), BlockSize)
url.QueryEscape(db), url.QueryEscape(username), url.QueryEscape(password), 2*config.MaxBufferSize)
if dsnParams != "" {
dsnTmpl += "&" + dsnParams
}
Expand Down

0 comments on commit 1208a4b

Please sign in to comment.