diff --git a/drainer/util.go b/drainer/util.go index 1d510de14..b86a50030 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -36,7 +36,9 @@ import ( const ( maxKafkaMsgSize = 1 << 30 - maxGrpcMsgSize = int(^uint(0) >> 1) + // max grpc message size, leave 4MB as buffer. Because when grpc decompresses messages, it will leave a few buffer + // for this, which overflows the int64: https://github.com/grpc/grpc-go/blob/v1.44.0/rpc_util.go#L742 + maxGrpcMsgSize = int(^uint(0)>>1) - 4*1024*1024 ) // taskGroup is a wrapper of `sync.WaitGroup`. diff --git a/pump/config.go b/pump/config.go index ced2cfdd6..313cc6a9b 100644 --- a/pump/config.go +++ b/pump/config.go @@ -32,10 +32,12 @@ import ( ) const ( - defaultEtcdDialTimeout = 5 * time.Second - defaultEtcdURLs = "http://127.0.0.1:2379" - defaultListenAddr = "127.0.0.1:8250" - defaultMaxMsgSize = int(^uint(0) >> 1) // max grpc message size + defaultEtcdDialTimeout = 5 * time.Second + defaultEtcdURLs = "http://127.0.0.1:2379" + defaultListenAddr = "127.0.0.1:8250" + // max grpc message size, leave 4MB as buffer. Because when grpc decompresses messages, it will leave a few buffer + // for this, which overflows the int64: https://github.com/grpc/grpc-go/blob/v1.44.0/rpc_util.go#L742 + defaultMaxMsgSize = int(^uint(0)>>1) - 4*1024*1024 defaultHeartbeatInterval = 2 defaultGC = "7" defaultDataDir = "data.pump" diff --git a/tests/cache_table/run.sh b/tests/cache_table/run.sh new file mode 100644 index 000000000..48ae47eda --- /dev/null +++ b/tests/cache_table/run.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +set -e + +cd "$(dirname "$0")" + +run_drainer --compressor gzip & + +sleep 3 + +run_sql 'CREATE DATABASE cache_test;' +run_sql 'CREATE TABLE cache_test.t1( a int, b varchar(128));' +run_sql "INSERT INTO cache_test.t1 (a,b) VALUES(1,'a'),(2,'b'),(3,'c'),(4,'d');" +run_sql "ALTER TABLE cache_test.t1 CACHE;" +run_sql "INSERT INTO cache_test.t1 (a,b) VALUES(5,'e'),(6,'f'),(7,'g'),(8,'h');" +run_sql "ALTER TABLE cache_test.t1 NOCACHE" + +sleep 3 + +down_run_sql 'SELECT a, b FROM cache_test.t1 order by a' +check_contains 'a: 7' +check_contains 'b: g' +check_contains 'a: 8' +check_contains 'b: h' + +run_sql 'DROP DATABASE cache_test' + +killall drainer