diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go index ac89aba1421..d93b0af006b 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "strconv" "strings" + "sync" "time" "unsafe" @@ -35,6 +36,8 @@ var ( byteOrder = binary.BigEndian heartbeatRsp = []byte{0x00, 0x00, 0x00, 0x01, 0x01} heartbeatRspLen = len(heartbeatRsp) + reqPool *sync.Pool + batchPool *sync.Pool ) const ( @@ -42,6 +45,19 @@ const ( msgTypeHeartbeat uint8 = 1 ) +func init() { + reqPool = &sync.Pool{ + New: func() interface{} { + return &sendDataReq{} + }, + } + batchPool = &sync.Pool{ + New: func() interface{} { + return &batchReq{} + }, + } +} + type heartbeatReq struct { } @@ -68,6 +84,7 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte { type batchCallback func() type batchReq struct { + pool *sync.Pool workerID string batchID string groupID string @@ -112,6 +129,10 @@ func (b *batchReq) done(err error) { b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds()) b.metrics.observeSize(errorCode, b.dataSize) } + + if b.pool != nil { + b.pool.Put(b) + } } func (b *batchReq) encode() []byte { @@ -299,6 +320,7 @@ func (b *batchRsp) decode(input []byte) { } type sendDataReq struct { + pool *sync.Pool ctx context.Context msg Message callback Callback @@ -328,6 +350,10 @@ func (s *sendDataReq) done(err error, errCode string) { s.metrics.incMessage(errCode) } + + if s.pool != nil { + s.pool.Put(s) + } } type closeReq struct { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index 7edfc0c6dcf..eb4a8348ac5 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -244,7 +244,9 @@ func (w *worker) start() { } func (w *worker) doSendAsync(ctx context.Context, msg Message, callback Callback, flushImmediately bool) { - req := &sendDataReq{ + req := reqPool.Get().(*sendDataReq) + *req = sendDataReq{ + pool: reqPool, ctx: ctx, msg: msg, callback: callback, @@ -319,7 +321,9 @@ func (w *worker) handleSendData(req *sendDataReq) { batch, ok := w.pendingBatches[req.msg.StreamID] if !ok { streamID := req.msg.StreamID - batch = &batchReq{ + batch = batchPool.Get().(*batchReq) + *batch = batchReq{ + pool: batchPool, workerID: w.indexStr, batchID: util.SnowFlakeID(), groupID: w.options.GroupID,