Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rawkv: Context leak in BatchPut #696

Closed
pingyu opened this issue Feb 8, 2023 · 1 comment · Fixed by #697 or #704
Closed

rawkv: Context leak in BatchPut #696

pingyu opened this issue Feb 8, 2023 · 1 comment · Fixed by #697 or #704
Assignees

Comments

@pingyu
Copy link
Contributor

pingyu commented Feb 8, 2023

Customers report that memory usage of TiKV-CDC keep increasing along with time, which using BatchPut interface to write data to downstream TiKV cluster.

This issue seem to be caused by context leak of BatchPut interface.

This following codes can reproduce this issue on master of client-go:

package main

import (
	"context"
	"fmt"
	"log"

	"net/http"
	_ "net/http/pprof"

	"github.com/tikv/client-go/v2/config"
	"github.com/tikv/client-go/v2/rawkv"
	"golang.org/x/sync/errgroup"
)

const THREAD_COUNT int = 100
const BATCH_SIZE int = 16

func main() {
	cli, err := rawkv.NewClient(context.TODO(), []string{"127.0.0.1:4379"}, config.DefaultConfig().Security)
	if err != nil {
		panic(err)
	}
	defer cli.Close()

	fmt.Printf("cluster ID: %d\n", cli.ClusterID())

	ctx := context.TODO()
	g, gctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		log.Println(http.ListenAndServe("localhost:6060", nil))
		return nil
	})

	for idx := 0; idx < THREAD_COUNT; idx += 1 {
		keys := make([][]byte, 0, BATCH_SIZE)
		values := make([][]byte, 0, BATCH_SIZE)
		for i := 0; i < BATCH_SIZE; i += 1 {
			keys = append(keys, []byte(fmt.Sprintf("%08d-%08d", idx, i)))
			values = append(values, []byte("v"))
		}
		g.Go(func() error {
			for {
				if err := cli.BatchPut(gctx, keys, values); err != nil {
					return err
				}
			}
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Printf("error: %v", err)
	}
}

Memory usage of this program increased from hundreds of MBs to several GBs in minutes.
image

Top 3 of pprof inuse heap memory:

heap profile: 7054: 322092072 [222329: 742479592] @ heap/1048576
1: 320864256 [1: 320864256] @ 0x4104ee 0x411f99 0x410d45 0x499ecb 0x499cc5 0xc4cf05 0xd2c593 0xd2c067 0xd2e30c 0xd2e2f7 0xd2d924 0x46f4c1
#	0x499eca	context.propagateCancel+0x10a						/disk1/home/pingyu/opt/go-1.19.2/src/context/context.go:273
#	0x499cc4	context.WithCancel+0xc4							/disk1/home/pingyu/opt/go-1.19.2/src/context/context.go:237
#	0xc4cf04	github.com/tikv/client-go/v2/internal/retry.(*Backoffer).Fork+0x44	/disk1/home/pingyu/workspace/client-go/internal/retry/backoff.go:259
#	0xd2c592	github.com/tikv/client-go/v2/rawkv.(*Client).sendBatchPut+0x352		/disk1/home/pingyu/workspace/client-go/rawkv/rawkv.go:889
#	0xd2c066	github.com/tikv/client-go/v2/rawkv.(*Client).BatchPutWithTTL+0x3a6	/disk1/home/pingyu/workspace/client-go/rawkv/rawkv.go:421
#	0xd2e30b	github.com/tikv/client-go/v2/rawkv.(*Client).BatchPut+0xeb		/disk1/home/pingyu/workspace/client-go/rawkv/rawkv.go:403
#	0xd2e2f6	main.main.func2+0xd6							/disk1/home/pingyu/workspace/client-go/examples/rawkv/rawkv_batch_put.go:59
#	0xd2d923	golang.org/x/sync/errgroup.(*Group).Go.func1+0x63			/disk1/home/pingyu/go/pkg/mod/golang.org/x/sync@v0.1.0/errgroup/errgroup.go:75

2789: 401616 [2789: 401616] @ 0x410ce3 0x499ecb 0x499cc5 0xc4cf05 0xd2cb55 0x46f4c1
#	0x499eca	context.propagateCancel+0x10a						/disk1/home/pingyu/opt/go-1.19.2/src/context/context.go:273
#	0x499cc4	context.WithCancel+0xc4							/disk1/home/pingyu/opt/go-1.19.2/src/context/context.go:237
#	0xc4cf04	github.com/tikv/client-go/v2/internal/retry.(*Backoffer).Fork+0x44	/disk1/home/pingyu/workspace/client-go/internal/retry/backoff.go:259
#	0xd2cb54	github.com/tikv/client-go/v2/rawkv.(*Client).sendBatchPut.func1+0x74	/disk1/home/pingyu/workspace/client-go/rawkv/rawkv.go:894

1: 188416 [1: 188416] @ 0x71a91d 0x743251 0x745c85 0x745705 0x7462f7 0x7444c5 0x7432ea 0x73c925 0x73d051 0x73d035 0x73d045 0x76c5ea 0x76c710 0x76c710 0x76c3ec 0x7914fc 0x7aadf0 0x7b7031 0x7b701a 0x72c869 0x72c86a 0x72c6b9 0x7d0bc5 0x8e20ce 0x8e20e9 0x9407e4 0x9585f2 0x95249d 0xc0258b 0xc591ca 0xc581a5 0xc5757e
#	0x71a91c	google.golang.org/protobuf/internal/strs.(*Builder).AppendFullName+0x11c		/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/strs/strings_unsafe.go:68
#	0x743250	google.golang.org/protobuf/internal/filedesc.appendFullName+0x50			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc_init.go:470
#	0x745c84	google.golang.org/protobuf/internal/filedesc.(*EnumValue).unmarshalFull+0x324		/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc_lazy.go:267
#	0x745704	google.golang.org/protobuf/internal/filedesc.(*Enum).unmarshalFull+0x964		/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc_lazy.go:216
#	0x7462f6	google.golang.org/protobuf/internal/filedesc.(*Message).unmarshalFull+0x456		/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc_lazy.go:306
#	0x7444c4	google.golang.org/protobuf/internal/filedesc.(*File).unmarshalFull+0x384		/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc_lazy.go:166
#	0x7432e9	google.golang.org/protobuf/internal/filedesc.(*File).lazyRawInit+0x29			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc_lazy.go:20
#	0x73c924	google.golang.org/protobuf/internal/filedesc.(*File).lazyInitOnce+0x64			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc.go:97
#	0x73d050	google.golang.org/protobuf/internal/filedesc.(*File).lazyInit+0x30			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc.go:89
#	0x73d034	google.golang.org/protobuf/internal/filedesc.(*Message).lazyInit+0x14			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc.go:250
#	0x73d044	google.golang.org/protobuf/internal/filedesc.(*Message).RequiredNumbers+0x24		/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/filedesc/desc.go:236
#	0x76c5e9	google.golang.org/protobuf/internal/impl.needsInitCheckLocked+0x129			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/checkinit.go:123
#	0x76c70f	google.golang.org/protobuf/internal/impl.needsInitCheckLocked+0x24f			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/checkinit.go:136
#	0x76c70f	google.golang.org/protobuf/internal/impl.needsInitCheckLocked+0x24f			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/checkinit.go:136
#	0x76c3eb	google.golang.org/protobuf/internal/impl.needsInitCheck+0xeb				/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/checkinit.go:101
#	0x7914fb	google.golang.org/protobuf/internal/impl.(*MessageInfo).makeCoderMethods+0xf9b		/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/codec_message.go:176
#	0x7aadef	google.golang.org/protobuf/internal/impl.(*MessageInfo).initOnce+0x20f			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/message.go:91
#	0x7b7030	google.golang.org/protobuf/internal/impl.(*MessageInfo).init+0x30			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/message.go:72
#	0x7b7019	google.golang.org/protobuf/internal/impl.(*messageReflectWrapper).ProtoMethods+0x19	/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/internal/impl/message_reflect_gen.go:150
#	0x72c868	google.golang.org/protobuf/proto.protoMethods+0x68					/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/proto/proto_methods.go:19
#	0x72c869	google.golang.org/protobuf/proto.MarshalOptions.marshal+0x69				/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/proto/encode.go:143
#	0x72c6b8	google.golang.org/protobuf/proto.MarshalOptions.MarshalAppend+0x78			/disk1/home/pingyu/go/pkg/mod/google.golang.org/protobuf@v1.28.1/proto/encode.go:125
#	0x7d0bc4	github.com/golang/protobuf/proto.marshalAppend+0xa4					/disk1/home/pingyu/go/pkg/mod/github.com/golang/protobuf@v1.5.2/proto/wire.go:40
#	0x8e20cd	github.com/golang/protobuf/proto.Marshal+0x4d						/disk1/home/pingyu/go/pkg/mod/github.com/golang/protobuf@v1.5.2/proto/wire.go:23
#	0x8e20e8	google.golang.org/grpc/encoding/proto.codec.Marshal+0x68				/disk1/home/pingyu/go/pkg/mod/google.golang.org/grpc@v1.52.3/encoding/proto/proto.go:45
#	0x9407e3	google.golang.org/grpc.encode+0x43							/disk1/home/pingyu/go/pkg/mod/google.golang.org/grpc@v1.52.3/rpc_util.go:595
#	0x9585f1	google.golang.org/grpc.prepareMsg+0xd1							/disk1/home/pingyu/go/pkg/mod/google.golang.org/grpc@v1.52.3/stream.go:1708
#	0x95249c	google.golang.org/grpc.(*clientStream).SendMsg+0xfc					/disk1/home/pingyu/go/pkg/mod/google.golang.org/grpc@v1.52.3/stream.go:846
#	0xc0258a	github.com/pingcap/kvproto/pkg/tikvpb.(*tikvBatchCommandsClient).Send+0x2a		/disk1/home/pingyu/go/pkg/mod/github.com/pingcap/kvproto@v0.0.0-20230206112125-0561adc37543/pkg/tikvpb/tikvpb.pb.go:2068
#	0xc591c9	github.com/tikv/client-go/v2/internal/client.(*batchCommandsClient).send+0x3e9		/disk1/home/pingyu/workspace/client-go/internal/client/client_batch.go:519
#	0xc581a4	github.com/tikv/client-go/v2/internal/client.(*batchConn).getClientAndSend+0x384	/disk1/home/pingyu/workspace/client-go/internal/client/client_batch.go:381
#	0xc5757d	github.com/tikv/client-go/v2/internal/client.(*batchConn).batchSendLoop+0x9d		/disk1/home/pingyu/workspace/client-go/internal/client/client_batch.go:344

Callgraph of inuse heap memory size:
image

Callgraph of inuse heap objects count:
image

This issue seems to be caused by the leak of context here. When there is not error, the cancel() will not be called, and the ctx in bo is leak.

func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls []uint64, opts *rawOptions) error {
	keyToValue := make(map[string][]byte, len(keys))
	keyToTTL := make(map[string]uint64, len(keys))
	for i, key := range keys {
		keyToValue[string(key)] = values[i]
		if len(ttls) > 0 {
			keyToTTL[string(key)] = ttls[i]
		}
	}
	groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
	if err != nil {
		return err
	}
	var batches []kvrpc.Batch
	// split the keys by size and RegionVerID
	for regionID, groupKeys := range groups {
		batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, keyToTTL, rawBatchPutSize)
	}
	bo, cancel := bo.Fork()
	ch := make(chan error, len(batches))
	for _, batch := range batches {
		batch1 := batch
		go func() {
			singleBatchBackoffer, singleBatchCancel := bo.Fork()
			defer singleBatchCancel()
			ch <- c.doBatchPut(singleBatchBackoffer, batch1, opts)
		}()
	}

	for i := 0; i < len(batches); i++ {
		if e := <-ch; e != nil {
			cancel()
			// catch the first error
			if err == nil {
				err = errors.WithStack(e)
			}
		}
	}
	return err
}
@pingyu
Copy link
Contributor Author

pingyu commented Feb 15, 2023

Another leak point was found in BatchDelete with similar cause.

Codes to reproduce the issue:

package main

import (
	"context"
	"fmt"
	"log"

	"net/http"
	_ "net/http/pprof"

	"github.com/tikv/client-go/v2/config"
	"github.com/tikv/client-go/v2/rawkv"
	"golang.org/x/sync/errgroup"
)

const THREAD_COUNT int = 100
const BATCH_SIZE int = 16

func main() {
	cli, err := rawkv.NewClient(context.TODO(), []string{"127.0.0.1:4379"}, config.DefaultConfig().Security)
	if err != nil {
		panic(err)
	}
	defer cli.Close()

	fmt.Printf("cluster ID: %d\n", cli.ClusterID())

	ctx := context.TODO()
	g, gctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		log.Println(http.ListenAndServe("localhost:6060", nil))
		return nil
	})

	for idx := 0; idx < THREAD_COUNT; idx += 1 {
		keys := make([][]byte, 0, BATCH_SIZE)
		values := make([][]byte, 0, BATCH_SIZE)
		for i := 0; i < BATCH_SIZE; i += 1 {
			keys = append(keys, []byte(fmt.Sprintf("%08d-%08d", idx, i)))
			values = append(values, []byte("v"))
		}
		g.Go(func() error {
			for {
				if err := cli.BatchPut(gctx, keys, values); err != nil {
					return err
				}
				if err := cli.BatchDelete(gctx, keys); err != nil {
					return err
				}
			}
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Printf("error: %v", err)
	}
}

Memory usage:
image

Call graph:
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant