From 0f1058d9b0dd3cc6c1cc5990df3c3d592b97262f Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Mon, 15 May 2023 11:56:46 -0600 Subject: [PATCH 01/12] Release write buffer after Flush() in transport --- internal/transport/http_util.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 19cbb18f5ab4..8bb810a3e0a9 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -318,7 +318,6 @@ type bufWriter struct { func newBufWriter(conn net.Conn, batchSize int) *bufWriter { return &bufWriter{ - buf: make([]byte, batchSize*2), batchSize: batchSize, conn: conn, } @@ -332,6 +331,9 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { n, err = w.conn.Write(b) return n, toIOError(err) } + if w.buf == nil { + w.buf = make([]byte, w.batchSize*2) + } for len(b) > 0 { nn := copy(w.buf[w.offset:], b) b = b[nn:] @@ -345,6 +347,12 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { } func (w *bufWriter) Flush() error { + err := w.flush() + w.buf = nil + return err +} + +func (w *bufWriter) flush() error { if w.err != nil { return w.err } From 63f4ba34b4769f40ebe174c64d8ec8bd90a32ef8 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Mon, 15 May 2023 14:41:45 -0600 Subject: [PATCH 02/12] Use sync.Pool --- internal/transport/http_util.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 8bb810a3e0a9..7cea0bd95141 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -30,6 +30,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "unicode/utf8" @@ -332,7 +333,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { return n, toIOError(err) } if w.buf == nil { - w.buf = make([]byte, w.batchSize*2) + w.buf = pool.Get().([]byte) } for len(b) > 0 { nn := copy(w.buf[w.offset:], b) @@ -348,6 +349,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { func (w *bufWriter) Flush() error { err := w.flush() + pool.Put(w.buf) w.buf = nil return err } @@ -389,6 +391,9 @@ type framer struct { fr *http2.Framer } +var pool *sync.Pool +var mutex sync.Mutex + func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer { if writeBufferSize < 0 { writeBufferSize = 0 @@ -397,6 +402,15 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList if readBufferSize > 0 { r = bufio.NewReaderSize(r, readBufferSize) } + mutex.Lock() + if pool == nil { + pool = &sync.Pool{ + New: func() any { + return make([]byte, writeBufferSize*2) + }, + } + } + mutex.Unlock() w := newBufWriter(conn, writeBufferSize) f := &framer{ writer: w, From 4faff38471670645ccdb8fcd0388ac02553f9d60 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Mon, 15 May 2023 14:44:52 -0600 Subject: [PATCH 03/12] Use interface{} instead of any --- internal/transport/http_util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 7cea0bd95141..095307091e51 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -405,7 +405,7 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList mutex.Lock() if pool == nil { pool = &sync.Pool{ - New: func() any { + New: func() interface{} { return make([]byte, writeBufferSize*2) }, } From f594b2904c32432494da51a86a6c1f7c212c5dd7 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Mon, 15 May 2023 21:45:01 -0600 Subject: [PATCH 04/12] use pool map --- internal/transport/http_util.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 095307091e51..a04d8211da38 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -310,6 +310,7 @@ func decodeGrpcMessageUnchecked(msg string) string { } type bufWriter struct { + pool *sync.Pool buf []byte offset int batchSize int @@ -317,10 +318,11 @@ type bufWriter struct { err error } -func newBufWriter(conn net.Conn, batchSize int) *bufWriter { +func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter { return &bufWriter{ batchSize: batchSize, conn: conn, + pool: pool, } } @@ -333,7 +335,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { return n, toIOError(err) } if w.buf == nil { - w.buf = pool.Get().([]byte) + w.buf = w.pool.Get().([]byte) } for len(b) > 0 { nn := copy(w.buf[w.offset:], b) @@ -349,7 +351,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { func (w *bufWriter) Flush() error { err := w.flush() - pool.Put(w.buf) + w.pool.Put(w.buf) w.buf = nil return err } @@ -391,7 +393,7 @@ type framer struct { fr *http2.Framer } -var pool *sync.Pool +var poolMap map[int]*sync.Pool = make(map[int]*sync.Pool) var mutex sync.Mutex func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer { @@ -403,15 +405,17 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList r = bufio.NewReaderSize(r, readBufferSize) } mutex.Lock() - if pool == nil { + pool, ok := poolMap[writeBufferSize*2] + if !ok { pool = &sync.Pool{ New: func() interface{} { return make([]byte, writeBufferSize*2) }, } } + poolMap[writeBufferSize*2] = pool mutex.Unlock() - w := newBufWriter(conn, writeBufferSize) + w := newBufWriter(conn, writeBufferSize, pool) f := &framer{ writer: w, fr: http2.NewFramer(w, r), From 95bde71db6e9abbfe38e6b2eaabcf2460923c809 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Tue, 16 May 2023 11:07:31 -0600 Subject: [PATCH 05/12] fix flush() --- internal/transport/http_util.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index a04d8211da38..0decb8cd28cc 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -343,7 +343,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { w.offset += nn n += nn if w.offset >= w.batchSize { - err = w.Flush() + err = w.flush() } } return n, err @@ -351,8 +351,10 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { func (w *bufWriter) Flush() error { err := w.flush() - w.pool.Put(w.buf) - w.buf = nil + if w.buf != nil { + w.pool.Put(w.buf) + w.buf = nil + } return err } @@ -412,8 +414,8 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList return make([]byte, writeBufferSize*2) }, } + poolMap[writeBufferSize*2] = pool } - poolMap[writeBufferSize*2] = pool mutex.Unlock() w := newBufWriter(conn, writeBufferSize, pool) f := &framer{ From a7ededc1b11662f4bf0a478769648443a94da697 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Wed, 17 May 2023 13:44:15 -0600 Subject: [PATCH 06/12] Fix linter error --- internal/transport/http_util.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 0decb8cd28cc..e99c08429229 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -335,10 +335,11 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { return n, toIOError(err) } if w.buf == nil { - w.buf = w.pool.Get().([]byte) + b := w.pool.Get().(*[]byte) + w.buf = *b } for len(b) > 0 { - nn := copy(w.buf[w.offset:], b) + nn := copy((w.buf)[w.offset:], b) b = b[nn:] w.offset += nn n += nn @@ -352,7 +353,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { func (w *bufWriter) Flush() error { err := w.flush() if w.buf != nil { - w.pool.Put(w.buf) + b := w.buf + w.pool.Put(&b) w.buf = nil } return err @@ -411,7 +413,8 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList if !ok { pool = &sync.Pool{ New: func() interface{} { - return make([]byte, writeBufferSize*2) + b := make([]byte, writeBufferSize*2) + return &b }, } poolMap[writeBufferSize*2] = pool From 16345bbf40125ec37bed3a12d6064ed098a20800 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Mon, 22 May 2023 18:39:15 -0600 Subject: [PATCH 07/12] make variable names more descripptive --- internal/transport/http_util.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index e99c08429229..a95ab4c9643c 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -397,8 +397,8 @@ type framer struct { fr *http2.Framer } -var poolMap map[int]*sync.Pool = make(map[int]*sync.Pool) -var mutex sync.Mutex +var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) +var writeBufferMutex sync.Mutex func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer { if writeBufferSize < 0 { @@ -408,18 +408,19 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList if readBufferSize > 0 { r = bufio.NewReaderSize(r, readBufferSize) } - mutex.Lock() - pool, ok := poolMap[writeBufferSize*2] + writeBufferMutex.Lock() + size := writeBufferSize * 2 + pool, ok := writeBufferPoolMap[size] if !ok { pool = &sync.Pool{ New: func() interface{} { - b := make([]byte, writeBufferSize*2) + b := make([]byte, size) return &b }, } - poolMap[writeBufferSize*2] = pool + writeBufferPoolMap[size] = pool } - mutex.Unlock() + writeBufferMutex.Unlock() w := newBufWriter(conn, writeBufferSize, pool) f := &framer{ writer: w, From e1bdb20a74cd63a7c753c24f9e2ec135ec44f265 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Wed, 28 Jun 2023 14:00:28 -0600 Subject: [PATCH 08/12] Add options to enable write buffer sharing --- benchmark/benchmain/main.go | 11 ++++++ benchmark/stats/stats.go | 9 +++-- dialoptions.go | 9 +++++ internal/transport/http2_client.go | 2 +- internal/transport/http2_server.go | 2 +- internal/transport/http_util.go | 50 ++++++++++++++++++---------- internal/transport/keepalive_test.go | 2 +- internal/transport/transport.go | 3 ++ server.go | 11 ++++++ 9 files changed, 77 insertions(+), 22 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 1366c18c972b..c4ce2e60833b 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -113,6 +113,8 @@ var ( sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list") connections = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams") recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools) + shareWriteBuffer = flags.StringWithAllowedValues("shareWriteBuffer", toggleModeOff, + fmt.Sprintf("Configures both client and server to share write buffer - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) logger = grpclog.Component("benchmark") ) @@ -333,6 +335,10 @@ func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func()) if bf.ServerReadBufferSize >= 0 { sopts = append(sopts, grpc.ReadBufferSize(bf.ServerReadBufferSize)) } + if bf.ShareWriteBuffer { + opts = append(opts, grpc.WithShareWriteBuffer(true)) + sopts = append(sopts, grpc.ShareWriteBuffer(true)) + } if bf.ServerWriteBufferSize >= 0 { sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize)) } @@ -589,6 +595,7 @@ type featureOpts struct { serverWriteBufferSize []int sleepBetweenRPCs []time.Duration recvBufferPools []string + shareWriteBuffer []bool } // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each @@ -637,6 +644,8 @@ func makeFeaturesNum(b *benchOpts) []int { featuresNum[i] = len(b.features.sleepBetweenRPCs) case stats.RecvBufferPool: featuresNum[i] = len(b.features.recvBufferPools) + case stats.ShareWriteBuffer: + featuresNum[i] = len(b.features.shareWriteBuffer) default: log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) } @@ -706,6 +715,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]], SleepBetweenRPCs: b.features.sleepBetweenRPCs[curPos[stats.SleepBetweenRPCs]], RecvBufferPool: b.features.recvBufferPools[curPos[stats.RecvBufferPool]], + ShareWriteBuffer: b.features.shareWriteBuffer[curPos[stats.ShareWriteBuffer]], } if len(b.features.reqPayloadCurves) == 0 { f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]] @@ -779,6 +789,7 @@ func processFlags() *benchOpts { serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...), sleepBetweenRPCs: append([]time.Duration(nil), *sleepBetweenRPCs...), recvBufferPools: setRecvBufferPool(*recvBufferPool), + shareWriteBuffer: setToggleMode(*shareWriteBuffer), }, } diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index 3989e25dbf4b..5cf82a7dcb82 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -58,6 +58,7 @@ const ( ServerWriteBufferSize SleepBetweenRPCs RecvBufferPool + ShareWriteBuffer // MaxFeatureIndex is a place holder to indicate the total number of feature // indices we have. Any new feature indices should be added above this. @@ -129,6 +130,8 @@ type Features struct { SleepBetweenRPCs time.Duration // RecvBufferPool represents the shared recv buffer pool used. RecvBufferPool string + // ShareWriteBuffer configures whether both client and server share per-connection write buffer + ShareWriteBuffer bool } // String returns all the feature values as a string. @@ -148,13 +151,13 @@ func (f Features) String() string { "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+ "compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+ "clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+ - "sleepBetweenRPCs_%v-connections_%v-recvBufferPool_%v-", + "sleepBetweenRPCs_%v-connections_%v-recvBufferPool_%v-shareWriteBuffer_%v", f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString, respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader, f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize, f.ServerWriteBufferSize, f.SleepBetweenRPCs, f.Connections, - f.RecvBufferPool) + f.RecvBufferPool, f.ShareWriteBuffer) } // SharedFeatures returns the shared features as a pretty printable string. @@ -230,6 +233,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim b.WriteString(fmt.Sprintf("SleepBetweenRPCs%v%v%v", sep, f.SleepBetweenRPCs, delim)) case RecvBufferPool: b.WriteString(fmt.Sprintf("RecvBufferPool%v%v%v", sep, f.RecvBufferPool, delim)) + case ShareWriteBuffer: + b.WriteString(fmt.Sprintf("ShareWriteBuffer%v%v%v", sep, f.ShareWriteBuffer, delim)) default: log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex) } diff --git a/dialoptions.go b/dialoptions.go index 23ea95237ea0..e87744dbefaa 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -139,6 +139,15 @@ func newJoinDialOption(opts ...DialOption) DialOption { return &joinDialOption{opts: opts} } +// WithShareWriteBuffer allows reusing per-connection transport write buffer. +// If this option is set to true every connection will release the buffer after +// flushing the data on the wire. +func WithShareWriteBuffer(val bool) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.copts.ShareWriteBuffer = val + }) +} + // WithWriteBufferSize determines how much data can be batched before doing a // write on the wire. The corresponding memory allocation for this buffer will // be twice the size to keep syscalls low. The default value for this buffer is diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 326bf0848000..19bfcc0cca92 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -330,7 +330,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts readerDone: make(chan struct{}), writerDone: make(chan struct{}), goAway: make(chan struct{}), - framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize), + framer: newFramer(conn, writeBufSize, readBufSize, opts.ShareWriteBuffer, maxHeaderListSize), fc: &trInFlow{limit: uint32(icwz)}, scheme: scheme, activeStreams: make(map[uint32]*Stream), diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index f9606401289d..9e5da31d30c1 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -165,7 +165,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, if config.MaxHeaderListSize != nil { maxHeaderListSize = *config.MaxHeaderListSize } - framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) + framer := newFramer(conn, writeBufSize, readBufSize, config.ShareWriteBuffer, maxHeaderListSize) // Send initial settings as connection preface to client. isettings := []http2.Setting{{ ID: http2.SettingMaxFrameSize, diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index a95ab4c9643c..f28af446528d 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -319,11 +319,16 @@ type bufWriter struct { } func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter { - return &bufWriter{ + w := &bufWriter{ batchSize: batchSize, conn: conn, pool: pool, } + // this indicates that we should use non shared buf + if pool == nil { + w.buf = make([]byte, batchSize) + } + return w } func (w *bufWriter) Write(b []byte) (n int, err error) { @@ -339,7 +344,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { w.buf = *b } for len(b) > 0 { - nn := copy((w.buf)[w.offset:], b) + nn := copy(w.buf[w.offset:], b) b = b[nn:] w.offset += nn n += nn @@ -352,7 +357,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { func (w *bufWriter) Flush() error { err := w.flush() - if w.buf != nil { + // Only release the buffer if we are in a "shared" mode + if w.buf != nil && w.pool != nil { b := w.buf w.pool.Put(&b) w.buf = nil @@ -399,8 +405,10 @@ type framer struct { var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) var writeBufferMutex sync.Mutex +var readBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) +var readBufferMutex sync.Mutex -func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer { +func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, shareWriteBuffer bool, maxHeaderListSize uint32) *framer { if writeBufferSize < 0 { writeBufferSize = 0 } @@ -408,19 +416,7 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList if readBufferSize > 0 { r = bufio.NewReaderSize(r, readBufferSize) } - writeBufferMutex.Lock() - size := writeBufferSize * 2 - pool, ok := writeBufferPoolMap[size] - if !ok { - pool = &sync.Pool{ - New: func() interface{} { - b := make([]byte, size) - return &b - }, - } - writeBufferPoolMap[size] = pool - } - writeBufferMutex.Unlock() + pool := getWriteBufferPool(writeBufferSize, shareWriteBuffer) w := newBufWriter(conn, writeBufferSize, pool) f := &framer{ writer: w, @@ -435,6 +431,26 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList return f } +func getWriteBufferPool(writeBufferSize int, shareWriteBuffer bool) *sync.Pool { + if !shareWriteBuffer { + return nil + } + writeBufferMutex.Lock() + defer writeBufferMutex.Unlock() + size := writeBufferSize * 2 + pool, ok := writeBufferPoolMap[size] + if !ok { + pool = &sync.Pool{ + New: func() interface{} { + b := make([]byte, size) + return &b + }, + } + writeBufferPoolMap[size] = pool + } + return pool +} + // parseDialTarget returns the network and address to pass to dialer. func parseDialTarget(target string) (string, string) { net := "tcp" diff --git a/internal/transport/keepalive_test.go b/internal/transport/keepalive_test.go index a46bcf020df8..8144277fb6c1 100644 --- a/internal/transport/keepalive_test.go +++ b/internal/transport/keepalive_test.go @@ -191,7 +191,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) { if n, err := conn.Write(clientPreface); err != nil || n != len(clientPreface) { t.Fatalf("conn.Write(clientPreface) failed: n=%v, err=%v", n, err) } - framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, 0) + framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, false, 0) if err := framer.fr.WriteSettings(http2.Setting{}); err != nil { t.Fatal("framer.WriteSettings(http2.Setting{}) failed:", err) } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index aa1c896595d9..1f738b8703cc 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -559,6 +559,7 @@ type ServerConfig struct { InitialConnWindowSize int32 WriteBufferSize int ReadBufferSize int + ShareWriteBuffer bool ChannelzParentID *channelz.Identifier MaxHeaderListSize *uint32 HeaderTableSize *uint32 @@ -592,6 +593,8 @@ type ConnectOptions struct { WriteBufferSize int // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. ReadBufferSize int + // ReadBufferSize indicates whether connections should reuse write buffer + ShareWriteBuffer bool // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. ChannelzParentID *channelz.Identifier // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. diff --git a/server.go b/server.go index e076ec7143bb..03b61689eaaf 100644 --- a/server.go +++ b/server.go @@ -170,6 +170,7 @@ type serverOptions struct { initialConnWindowSize int32 writeBufferSize int readBufferSize int + shareWriteBuffer bool connectionTimeout time.Duration maxHeaderListSize *uint32 headerTableSize *uint32 @@ -235,6 +236,15 @@ func newJoinServerOption(opts ...ServerOption) ServerOption { return &joinServerOption{opts: opts} } +// ShareWriteBuffer allows reusing per-connection transport write buffer. +// If this option is set to true every connection will release the buffer after +// flushing the data on the wire. +func ShareWriteBuffer(val bool) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.shareWriteBuffer = val + }) +} + // WriteBufferSize determines how much data can be batched before doing a write // on the wire. The corresponding memory allocation for this buffer will be // twice the size to keep syscalls low. The default value for this buffer is @@ -938,6 +948,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { InitialConnWindowSize: s.opts.initialConnWindowSize, WriteBufferSize: s.opts.writeBufferSize, ReadBufferSize: s.opts.readBufferSize, + ShareWriteBuffer: s.opts.shareWriteBuffer, ChannelzParentID: s.channelzID, MaxHeaderListSize: s.opts.maxHeaderListSize, HeaderTableSize: s.opts.headerTableSize, From 9b973274b1ecde9bf468c9087e159c8690d7fbe4 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Wed, 28 Jun 2023 14:11:48 -0600 Subject: [PATCH 09/12] delete unsed variable --- internal/transport/http_util.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index f28af446528d..63a43c26c3c5 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -405,8 +405,6 @@ type framer struct { var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) var writeBufferMutex sync.Mutex -var readBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) -var readBufferMutex sync.Mutex func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, shareWriteBuffer bool, maxHeaderListSize uint32) *framer { if writeBufferSize < 0 { From 00e8de7ca5f3dd7b4198ffaacea7bfa2f2fe3b6a Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Wed, 12 Jul 2023 11:44:51 +0200 Subject: [PATCH 10/12] address review comments --- benchmark/benchmain/main.go | 18 +++++++++--------- benchmark/stats/stats.go | 14 +++++++------- dialoptions.go | 6 +++--- internal/transport/http2_client.go | 2 +- internal/transport/http2_server.go | 2 +- internal/transport/http_util.go | 29 +++++++++++++++-------------- internal/transport/transport.go | 6 +++--- server.go | 10 +++++----- 8 files changed, 44 insertions(+), 43 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index c0b63dc7cf78..76c1a265e50f 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -115,7 +115,7 @@ var ( sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list") connections = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams") recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools) - shareWriteBuffer = flags.StringWithAllowedValues("shareWriteBuffer", toggleModeOff, + sharedWriteBuffer = flags.StringWithAllowedValues("sharedWriteBuffer", toggleModeOff, fmt.Sprintf("Configures both client and server to share write buffer - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) logger = grpclog.Component("benchmark") @@ -337,9 +337,9 @@ func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func()) if bf.ServerReadBufferSize >= 0 { sopts = append(sopts, grpc.ReadBufferSize(bf.ServerReadBufferSize)) } - if bf.ShareWriteBuffer { - opts = append(opts, grpc.WithShareWriteBuffer(true)) - sopts = append(sopts, grpc.ShareWriteBuffer(true)) + if bf.SharedWriteBuffer { + opts = append(opts, grpc.WithSharedWriteBuffer(true)) + sopts = append(sopts, grpc.SharedWriteBuffer(true)) } if bf.ServerWriteBufferSize >= 0 { sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize)) @@ -609,7 +609,7 @@ type featureOpts struct { serverWriteBufferSize []int sleepBetweenRPCs []time.Duration recvBufferPools []string - shareWriteBuffer []bool + sharedWriteBuffer []bool } // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each @@ -658,8 +658,8 @@ func makeFeaturesNum(b *benchOpts) []int { featuresNum[i] = len(b.features.sleepBetweenRPCs) case stats.RecvBufferPool: featuresNum[i] = len(b.features.recvBufferPools) - case stats.ShareWriteBuffer: - featuresNum[i] = len(b.features.shareWriteBuffer) + case stats.SharedWriteBuffer: + featuresNum[i] = len(b.features.sharedWriteBuffer) default: log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) } @@ -729,7 +729,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]], SleepBetweenRPCs: b.features.sleepBetweenRPCs[curPos[stats.SleepBetweenRPCs]], RecvBufferPool: b.features.recvBufferPools[curPos[stats.RecvBufferPool]], - ShareWriteBuffer: b.features.shareWriteBuffer[curPos[stats.ShareWriteBuffer]], + SharedWriteBuffer: b.features.sharedWriteBuffer[curPos[stats.SharedWriteBuffer]], } if len(b.features.reqPayloadCurves) == 0 { f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]] @@ -803,7 +803,7 @@ func processFlags() *benchOpts { serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...), sleepBetweenRPCs: append([]time.Duration(nil), *sleepBetweenRPCs...), recvBufferPools: setRecvBufferPool(*recvBufferPool), - shareWriteBuffer: setToggleMode(*shareWriteBuffer), + sharedWriteBuffer: setToggleMode(*sharedWriteBuffer), }, } diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index 5cf82a7dcb82..e42c5b6c0f24 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -58,7 +58,7 @@ const ( ServerWriteBufferSize SleepBetweenRPCs RecvBufferPool - ShareWriteBuffer + SharedWriteBuffer // MaxFeatureIndex is a place holder to indicate the total number of feature // indices we have. Any new feature indices should be added above this. @@ -130,8 +130,8 @@ type Features struct { SleepBetweenRPCs time.Duration // RecvBufferPool represents the shared recv buffer pool used. RecvBufferPool string - // ShareWriteBuffer configures whether both client and server share per-connection write buffer - ShareWriteBuffer bool + // SharedWriteBuffer configures whether both client and server share per-connection write buffer + SharedWriteBuffer bool } // String returns all the feature values as a string. @@ -151,13 +151,13 @@ func (f Features) String() string { "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+ "compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+ "clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+ - "sleepBetweenRPCs_%v-connections_%v-recvBufferPool_%v-shareWriteBuffer_%v", + "sleepBetweenRPCs_%v-connections_%v-recvBufferPool_%v-sharedWriteBuffer_%v", f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString, respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader, f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize, f.ServerWriteBufferSize, f.SleepBetweenRPCs, f.Connections, - f.RecvBufferPool, f.ShareWriteBuffer) + f.RecvBufferPool, f.SharedWriteBuffer) } // SharedFeatures returns the shared features as a pretty printable string. @@ -233,8 +233,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim b.WriteString(fmt.Sprintf("SleepBetweenRPCs%v%v%v", sep, f.SleepBetweenRPCs, delim)) case RecvBufferPool: b.WriteString(fmt.Sprintf("RecvBufferPool%v%v%v", sep, f.RecvBufferPool, delim)) - case ShareWriteBuffer: - b.WriteString(fmt.Sprintf("ShareWriteBuffer%v%v%v", sep, f.ShareWriteBuffer, delim)) + case SharedWriteBuffer: + b.WriteString(fmt.Sprintf("SharedWriteBuffer%v%v%v", sep, f.SharedWriteBuffer, delim)) default: log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex) } diff --git a/dialoptions.go b/dialoptions.go index e87744dbefaa..578ac7635827 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -139,12 +139,12 @@ func newJoinDialOption(opts ...DialOption) DialOption { return &joinDialOption{opts: opts} } -// WithShareWriteBuffer allows reusing per-connection transport write buffer. +// WithSharedWriteBufferWithSharedWriteBuffer allows reusing per-connection transport write buffer. // If this option is set to true every connection will release the buffer after // flushing the data on the wire. -func WithShareWriteBuffer(val bool) DialOption { +func WithSharedWriteBuffer(val bool) DialOption { return newFuncDialOption(func(o *dialOptions) { - o.copts.ShareWriteBuffer = val + o.copts.SharedWriteBuffer = val }) } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 19bfcc0cca92..52b88c32b15b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -330,7 +330,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts readerDone: make(chan struct{}), writerDone: make(chan struct{}), goAway: make(chan struct{}), - framer: newFramer(conn, writeBufSize, readBufSize, opts.ShareWriteBuffer, maxHeaderListSize), + framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize), fc: &trInFlow{limit: uint32(icwz)}, scheme: scheme, activeStreams: make(map[uint32]*Stream), diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 9e5da31d30c1..c48091f6c0a6 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -165,7 +165,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, if config.MaxHeaderListSize != nil { maxHeaderListSize = *config.MaxHeaderListSize } - framer := newFramer(conn, writeBufSize, readBufSize, config.ShareWriteBuffer, maxHeaderListSize) + framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize) // Send initial settings as connection preface to client. isettings := []http2.Setting{{ ID: http2.SettingMaxFrameSize, diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 63a43c26c3c5..650730c7f43b 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -406,7 +406,7 @@ type framer struct { var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) var writeBufferMutex sync.Mutex -func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, shareWriteBuffer bool, maxHeaderListSize uint32) *framer { +func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer { if writeBufferSize < 0 { writeBufferSize = 0 } @@ -414,7 +414,10 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, shareWriteBuf if readBufferSize > 0 { r = bufio.NewReaderSize(r, readBufferSize) } - pool := getWriteBufferPool(writeBufferSize, shareWriteBuffer) + var pool *sync.Pool + if sharedWriteBuffer { + pool = getWriteBufferPool(writeBufferSize) + } w := newBufWriter(conn, writeBufferSize, pool) f := &framer{ writer: w, @@ -429,23 +432,21 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, shareWriteBuf return f } -func getWriteBufferPool(writeBufferSize int, shareWriteBuffer bool) *sync.Pool { - if !shareWriteBuffer { - return nil - } +func getWriteBufferPool(writeBufferSize int) *sync.Pool { writeBufferMutex.Lock() defer writeBufferMutex.Unlock() size := writeBufferSize * 2 pool, ok := writeBufferPoolMap[size] - if !ok { - pool = &sync.Pool{ - New: func() interface{} { - b := make([]byte, size) - return &b - }, - } - writeBufferPoolMap[size] = pool + if ok { + return pool + } + pool = &sync.Pool{ + New: func() interface{} { + b := make([]byte, size) + return &b + }, } + writeBufferPoolMap[size] = pool return pool } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 1f738b8703cc..3828b3e4a8d1 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -559,7 +559,7 @@ type ServerConfig struct { InitialConnWindowSize int32 WriteBufferSize int ReadBufferSize int - ShareWriteBuffer bool + SharedWriteBuffer bool ChannelzParentID *channelz.Identifier MaxHeaderListSize *uint32 HeaderTableSize *uint32 @@ -593,8 +593,8 @@ type ConnectOptions struct { WriteBufferSize int // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. ReadBufferSize int - // ReadBufferSize indicates whether connections should reuse write buffer - ShareWriteBuffer bool + // SharedWriteBuffer indicates whether connections should reuse write buffer + SharedWriteBuffer bool // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. ChannelzParentID *channelz.Identifier // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. diff --git a/server.go b/server.go index 03b61689eaaf..0770c1e6c0e4 100644 --- a/server.go +++ b/server.go @@ -170,7 +170,7 @@ type serverOptions struct { initialConnWindowSize int32 writeBufferSize int readBufferSize int - shareWriteBuffer bool + sharedWriteBuffer bool connectionTimeout time.Duration maxHeaderListSize *uint32 headerTableSize *uint32 @@ -236,12 +236,12 @@ func newJoinServerOption(opts ...ServerOption) ServerOption { return &joinServerOption{opts: opts} } -// ShareWriteBuffer allows reusing per-connection transport write buffer. +// SharedWriteBuffer allows reusing per-connection transport write buffer. // If this option is set to true every connection will release the buffer after // flushing the data on the wire. -func ShareWriteBuffer(val bool) ServerOption { +func SharedWriteBuffer(val bool) ServerOption { return newFuncServerOption(func(o *serverOptions) { - o.shareWriteBuffer = val + o.sharedWriteBuffer = val }) } @@ -948,7 +948,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { InitialConnWindowSize: s.opts.initialConnWindowSize, WriteBufferSize: s.opts.writeBufferSize, ReadBufferSize: s.opts.readBufferSize, - ShareWriteBuffer: s.opts.shareWriteBuffer, + SharedWriteBuffer: s.opts.sharedWriteBuffer, ChannelzParentID: s.channelzID, MaxHeaderListSize: s.opts.maxHeaderListSize, HeaderTableSize: s.opts.headerTableSize, From 9aeb148f58072d0429ef781f80d3af88f755f5e7 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 13 Jul 2023 09:12:56 +0200 Subject: [PATCH 11/12] fix linter error --- dialoptions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dialoptions.go b/dialoptions.go index 578ac7635827..566f2b938e7f 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -139,7 +139,7 @@ func newJoinDialOption(opts ...DialOption) DialOption { return &joinDialOption{opts: opts} } -// WithSharedWriteBufferWithSharedWriteBuffer allows reusing per-connection transport write buffer. +// WithSharedWriteBuffer allows reusing per-connection transport write buffer. // If this option is set to true every connection will release the buffer after // flushing the data on the wire. func WithSharedWriteBuffer(val bool) DialOption { From 82a400ec5a712faac1cbaf6550ef57b12e1568d2 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 20 Jul 2023 14:14:39 -0600 Subject: [PATCH 12/12] Add Experimental tag, rename flush to flushKeepBuffer --- dialoptions.go | 5 +++++ internal/transport/http_util.go | 6 +++--- server.go | 5 +++++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index 566f2b938e7f..1fd0d5c127f4 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -142,6 +142,11 @@ func newJoinDialOption(opts ...DialOption) DialOption { // WithSharedWriteBuffer allows reusing per-connection transport write buffer. // If this option is set to true every connection will release the buffer after // flushing the data on the wire. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. func WithSharedWriteBuffer(val bool) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.SharedWriteBuffer = val diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 650730c7f43b..add1e9b2cc04 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -349,14 +349,14 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { w.offset += nn n += nn if w.offset >= w.batchSize { - err = w.flush() + err = w.flushKeepBuffer() } } return n, err } func (w *bufWriter) Flush() error { - err := w.flush() + err := w.flushKeepBuffer() // Only release the buffer if we are in a "shared" mode if w.buf != nil && w.pool != nil { b := w.buf @@ -366,7 +366,7 @@ func (w *bufWriter) Flush() error { return err } -func (w *bufWriter) flush() error { +func (w *bufWriter) flushKeepBuffer() error { if w.err != nil { return w.err } diff --git a/server.go b/server.go index 0770c1e6c0e4..01b3265223c6 100644 --- a/server.go +++ b/server.go @@ -239,6 +239,11 @@ func newJoinServerOption(opts ...ServerOption) ServerOption { // SharedWriteBuffer allows reusing per-connection transport write buffer. // If this option is set to true every connection will release the buffer after // flushing the data on the wire. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. func SharedWriteBuffer(val bool) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.sharedWriteBuffer = val