From 2e769133338110d2071793c9605f90622db1c2fb Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 19 Nov 2024 03:01:18 +0100 Subject: [PATCH 1/7] fix: memory leak when assigning the same key with ttl multiply times Signed-off-by: Valery Piashchynski --- go.work.sum | 21 ++ memorykv/item.go | 7 +- memorykv/kv.go | 245 +++++++++++------------- memorykv/map.go | 79 ++++++++ tests/configs/.rr-in-memory-memory.yaml | 13 ++ tests/configs/.rr-in-memory.yaml | 9 +- tests/kv_memory_test.go | 111 +++++++++++ 7 files changed, 337 insertions(+), 148 deletions(-) create mode 100644 memorykv/map.go create mode 100644 tests/configs/.rr-in-memory-memory.yaml diff --git a/go.work.sum b/go.work.sum index 9422229..5b0bb47 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1,5 +1,7 @@ cel.dev/expr v0.15.0 h1:O1jzfJCQBfL5BFoYktaxwIhuttaQPsVWerH9/EEKx0w= cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg= +cel.dev/expr v0.16.1 h1:NR0+oFYzR1CqLFhTAqg3ql59G9VfN8fKq1TCHJ6gq1g= +cel.dev/expr v0.16.1/go.mod h1:AsGA5zb3WruAEQeQng1RZdGEXmBj0jvMWh6l5SnNuC8= cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= cloud.google.com/go v0.111.0 h1:YHLKNupSD1KqjDbQ3+LVdQ81h/UJbJyZG203cEfnQgM= cloud.google.com/go v0.111.0/go.mod h1:0mibmpKP1TyOOFYQY5izo0LnT+ecvOQ0Sg3OdmMiNRU= @@ -97,6 +99,8 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/compute/metadata v0.5.0 h1:Zr0eK8JbFv6+Wi4ilXAR8FJ3wyNdpxHKJNPos6LTZOY= +cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY= cloud.google.com/go/contactcenterinsights v1.10.0 h1:YR2aPedGVQPpFBZXJnPkqRj8M//8veIZZH5ZvICoXnI= cloud.google.com/go/contactcenterinsights v1.12.1 h1:EiGBeejtDDtr3JXt9W7xlhXyZ+REB5k2tBgVPVtmNb0= cloud.google.com/go/contactcenterinsights v1.12.1/go.mod h1:HHX5wrz5LHVAwfI2smIotQG9x8Qd6gYilaHcLLLmNis= @@ -462,6 +466,8 @@ github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipa github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8ETbOasdwEV+avkR75ZzsVV9WI= +github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= @@ -483,10 +489,14 @@ github.com/envoyproxy/go-control-plane v0.11.1 h1:wSUXTlLfiAQRWs2F+p+EKOY9rUyis1 github.com/envoyproxy/go-control-plane v0.11.1/go.mod h1:uhMcXKCQMEJHiAb0w+YGefQLaTEw+YhGluxZkrTmD0g= github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/go-control-plane v0.13.0 h1:HzkeUz1Knt+3bK+8LG1bxOO/jzWZmdxpwC51i202les= +github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnvMg4d7nvT/wl9WgVXn3Q8= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= +github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= @@ -526,8 +536,11 @@ github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY= +github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -623,6 +636,8 @@ github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lyft/protoc-gen-star v0.6.1 h1:erE0rdztuaDq3bpGifD95wfoPrSZc95nGA6tbiNYh6M= github.com/lyft/protoc-gen-star/v2 v2.0.1 h1:keaAo8hRuAT0O3DfJ/wM3rufbAjGeJ1lAtWZHDjKGB0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -672,6 +687,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/sftp v1.13.1 h1:I2qBYMChEhIjOgazfJmV3/mZM256btk6wkCDRmW7JYs= github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5nfFs= github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k= github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= @@ -752,6 +769,8 @@ golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -764,6 +783,8 @@ golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= +golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM= golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= diff --git a/memorykv/item.go b/memorykv/item.go index 7bde27e..d5273b4 100644 --- a/memorykv/item.go +++ b/memorykv/item.go @@ -1,9 +1,10 @@ package memorykv type Item struct { - key string - value []byte - timeout string + key string + value []byte + timeout string + callback *cb } func (i *Item) Key() string { diff --git a/memorykv/kv.go b/memorykv/kv.go index 06df07f..4681f8a 100644 --- a/memorykv/kv.go +++ b/memorykv/kv.go @@ -9,7 +9,6 @@ import ( "github.com/roadrunner-server/api/v4/plugins/v1/kv" "github.com/roadrunner-server/errors" - "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/zap" ) @@ -18,23 +17,19 @@ const ( tracerName = "inmemory" ) -type callback func(sCh <-chan struct{}) - type cb struct { updateCh chan int // new ttl stopCh chan struct{} } type Driver struct { - heap sync.Map // map[string]kv.Item - // callbacks contains all callbacks channels for the keys - callbacks sync.Map // map[string]*cb + heap *hmap + mu *sync.Mutex broadcastStopCh atomic.Pointer[chan struct{}] - mapSize int64 - tracer *sdktrace.TracerProvider - log *zap.Logger - cfg *Config + tracer *sdktrace.TracerProvider + log *zap.Logger + cfg *Config } type Configurer interface { @@ -52,11 +47,10 @@ func NewInMemoryDriver(key string, log *zap.Logger, cfgPlugin Configurer, tracer } d := &Driver{ - callbacks: sync.Map{}, - heap: sync.Map{}, - mapSize: 0, - log: log, - tracer: tracer, + mu: &sync.Mutex{}, + heap: newHMap(), + log: log, + tracer: tracer, } ch := make(chan struct{}) @@ -87,6 +81,7 @@ func (d *Driver) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, errors.NoKeys) } + // todo(rustatian): move this map to a sync.Pool? m := make(map[string]bool) for i := range keys { keyTrimmed := strings.TrimSpace(keys[i]) @@ -94,13 +89,11 @@ func (d *Driver) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, errors.EmptyKey) } - if _, ok := d.heap.Load(keys[i]); ok { + if _, ok := d.heap.Get(keys[i]); ok { m[keys[i]] = true } } - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return m, nil } @@ -117,15 +110,12 @@ func (d *Driver) Get(key string) ([]byte, error) { return nil, errors.E(op, errors.EmptyKey) } - if data, exist := d.heap.Load(key); exist { + if data, exist := d.heap.Get(key); exist { // here might be a panic // but data only could be a string, see Set function - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return data.(kv.Item).Value(), nil + return data.Value(), nil } - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return nil, nil } @@ -151,13 +141,11 @@ func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { m := make(map[string][]byte, len(keys)) for i := 0; i < len(keys); i++ { - if value, ok := d.heap.Load(keys[i]); ok { - m[keys[i]] = value.(kv.Item).Value() + if value, ok := d.heap.Get(keys[i]); ok { + m[keys[i]] = value.Value() } } - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return m, nil } @@ -175,6 +163,12 @@ func (d *Driver) Set(items ...kv.Item) error { if items[i] == nil { continue } + + // check for the duplicates + d.heap.Delete(items[i].Key()) + + // at this point the duplicate key is removed + // TTL is set if items[i].Timeout() != "" { // check the TTL in the item @@ -186,33 +180,40 @@ func (d *Driver) Set(items ...kv.Item) error { tm := int(tt.UTC().Sub(time.Now().UTC()).Seconds()) // we already in the future :) - if tm < 0 { - d.updateAllocatedSize(int64(len(items[i].Key()) + len(items[i].Value()) + len(items[i].Timeout()))) + if tm <= 0 { + d.log.Warn("incorrect TTL time, saving without it", zap.String("key", items[i].Key())) // set item - d.heap.Store(items[i].Key(), items[i]) + d.heap.Set(items[i].Key(), &Item{ + key: items[i].Key(), + value: items[i].Value(), + }) continue } + // at this point, we have valid TTL and should save the item with the callback + // create callback to delete the key from the heap - clbk, stopCh, updateCh := d.ttlcallback(items[i].Key(), tm) - go func() { - clbk(*d.broadcastStopCh.Load()) - }() - - // store the callback since we have TTL - d.callbacks.Store(items[i].Key(), &cb{ - updateCh: updateCh, - stopCh: stopCh, + stopCh, updateCh := d.ttlcallback(items[i].Key(), tm, *d.broadcastStopCh.Load()) + + d.heap.Set(items[i].Key(), &Item{ + key: items[i].Key(), + value: items[i].Value(), + timeout: items[i].Timeout(), + callback: &cb{ + updateCh: updateCh, + stopCh: stopCh, + }, + }) + } else { + // set item without TTL + d.log.Debug("saving item without TTL", zap.String("key", items[i].Key())) + d.heap.Set(items[i].Key(), &Item{ + key: items[i].Key(), + value: items[i].Value(), }) } - - d.updateAllocatedSize(int64(len(items[i].Key()) + len(items[i].Value()) + len(items[i].Timeout()))) - // set item - d.heap.Store(items[i].Key(), items[i]) } - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return nil } @@ -246,26 +247,25 @@ func (d *Driver) MExpire(items ...kv.Item) error { ttm = 0 } - if clb, ok := d.callbacks.Load(items[i].Key()); ok { + if clb, ok := d.heap.Get(items[i].Key()); ok { // send new ttl to the callback - clb.(*cb).updateCh <- ttm + clb.callback.updateCh <- ttm } else { // we should set the callback // create callback to delete the key from the heap - clbk, stopCh, updateCh := d.ttlcallback(items[i].Key(), ttm) - go func() { - clbk(*d.broadcastStopCh.Load()) - }() - - d.callbacks.Store(items[i].Key(), &cb{ - updateCh: updateCh, - stopCh: stopCh, + stopCh, updateCh := d.ttlcallback(items[i].Key(), ttm, *d.broadcastStopCh.Load()) + d.heap.Set(items[i].Key(), &Item{ + key: items[i].Key(), + value: items[i].Value(), + timeout: items[i].Timeout(), + callback: &cb{ + updateCh: updateCh, + stopCh: stopCh, + }, }) } } - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return nil } @@ -291,13 +291,11 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) { m := make(map[string]string, len(keys)) for i := range keys { - if item, ok := d.heap.Load(keys[i]); ok { - m[keys[i]] = item.(kv.Item).Timeout() + if item, ok := d.heap.Get(keys[i]); ok { + m[keys[i]] = item.Timeout() } } - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return m, nil } @@ -321,16 +319,15 @@ func (d *Driver) Delete(keys ...string) error { } for i := range keys { - d.heap.Delete(keys[i]) - clbk, ok := d.callbacks.LoadAndDelete(keys[i]) + k, ok := d.heap.LoadAndDelete(keys[i]) if ok { - // send signal to stop the timer and delete the item - clbk.(*cb).stopCh <- struct{}{} + if k.callback != nil { + // send signal to stop the timer and delete the item + k.callback.stopCh <- struct{}{} + } } } - span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) - return nil } @@ -342,91 +339,65 @@ func (d *Driver) Clear() error { newCh := make(chan struct{}) d.broadcastStopCh.Swap(&newCh) - - d.heap.Range(func(key any, _ any) bool { - d.heap.Delete(key) - return true - }) - - // zero the allocated size - atomic.StoreInt64(&d.mapSize, 0) + d.heap.Clean() + d.heap = newHMap() return nil } -func (d *Driver) updateAllocatedSize(newsize int64) { - if newsize > 0 { - atomic.AddInt64(&d.mapSize, newsize) - return - } - - curr := atomic.LoadInt64(&d.mapSize) - if curr >= newsize { - atomic.AddInt64(&d.mapSize, newsize) - } else { - atomic.StoreInt64(&d.mapSize, 0) - } -} - -func (d *Driver) loadAllocatedSize() int64 { - return atomic.LoadInt64(&d.mapSize) -} - func (d *Driver) Stop() { close(*d.broadcastStopCh.Load()) } // ================================== PRIVATE ====================================== -func (d *Driver) ttlcallback(id string, ttl int) (callback, chan struct{}, chan int) { +func (d *Driver) ttlcallback(id string, ttl int, sCh <-chan struct{}) (chan struct{}, chan int) { stopCbCh := make(chan struct{}, 1) updateTTLCh := make(chan int, 1) - // at this point, when adding lock, we should not have the callback - return func(sCh <-chan struct{}) { + go func(hid string) { // ttl cbttl := ttl ta := time.NewTicker(time.Second * time.Duration(cbttl)) - loop: - select { - case <-ta.C: - d.log.Debug("ttl expired", - zap.String("id", id), - zap.Int("ttl seconds", cbttl), - ) - ta.Stop() - // broadcast stop channel - case <-sCh: - d.log.Debug("ttl removed, broadcast call", - zap.String("id", id), - zap.Int("ttl seconds", cbttl), - ) - ta.Stop() - // item stop channel - case <-stopCbCh: - d.log.Debug("ttl removed, callback call", - zap.String("id", id), - zap.Int("ttl seconds", cbttl), - ) - ta.Stop() - case newTTL := <-updateTTLCh: - d.log.Debug("updating ttl", - zap.String("id", id), - zap.Int("prev_ttl", cbttl), - zap.Int("new_ttl", newTTL)) - // update callback TTL (for logs) - cbttl = newTTL - ta.Reset(time.Second * time.Duration(newTTL)) - // in case of TTL we don't need to remove the item, only update TTL - goto loop - } - - val, ok := d.heap.LoadAndDelete(id) - if ok { - // subtract the size of the item - d.updateAllocatedSize(-int64(len(id) + len(val.(kv.Item).Value()) + len(val.(kv.Item).Timeout()))) + for { + select { + case <-ta.C: + d.log.Debug("ttl expired", + zap.String("id", hid), + zap.Int("ttl seconds", cbttl), + ) + ta.Stop() + // removeEntry removes the entry w/o checking callback + d.heap.removeEntry(hid) + return + case <-sCh: + // broadcast stop channel + d.log.Debug("ttl removed, broadcast call", + zap.String("id", hid), + zap.Int("ttl seconds", cbttl), + ) + ta.Stop() + // removeEntry removes the entry w/o checking callback + d.heap.removeEntry(hid) + return + case <-stopCbCh: + // item stop channel + d.log.Debug("ttl removed, callback call", + zap.String("id", hid), + zap.Int("ttl seconds", cbttl), + ) + return + case newTTL := <-updateTTLCh: + // in case of TTL we don't need to remove the item, only update TTL + d.log.Debug("updating ttl", + zap.String("id", hid), + zap.Int("prev_ttl", cbttl), + zap.Int("new_ttl", newTTL)) + // update callback TTL (for logs) + cbttl = newTTL + ta.Reset(time.Second * time.Duration(newTTL)) + } } - - d.callbacks.Delete(id) - }, stopCbCh, updateTTLCh + }(id) + return stopCbCh, updateTTLCh } diff --git a/memorykv/map.go b/memorykv/map.go new file mode 100644 index 0000000..bc798e9 --- /dev/null +++ b/memorykv/map.go @@ -0,0 +1,79 @@ +package memorykv + +import ( + "sync" +) + +type hmap struct { + mu sync.RWMutex + items map[string]*Item +} + +func newHMap() *hmap { + return &hmap{ + items: make(map[string]*Item, 10), + } +} + +func (h *hmap) Get(key string) (*Item, bool) { + h.mu.RLock() + defer h.mu.RUnlock() + + item, ok := h.items[key] + return item, ok +} + +func (h *hmap) Set(key string, item *Item) { + h.mu.Lock() + defer h.mu.Unlock() + + h.items[key] = item +} + +func (h *hmap) LoadAndDelete(key string) (*Item, bool) { + h.mu.Lock() + defer h.mu.Unlock() + + item, ok := h.items[key] + if ok { + if item.callback != nil { + item.callback.stopCh <- struct{}{} + } + delete(h.items, key) + } + + return item, ok +} + +func (h *hmap) Clean() { + h.mu.Lock() + defer h.mu.Unlock() + + for k, v := range h.items { + if v != nil { + if v.callback != nil { + v.callback.stopCh <- struct{}{} + } + } + + delete(h.items, k) + } +} + +func (h *hmap) Delete(key string) { + h.mu.Lock() + defer h.mu.Unlock() + + if h.items[key] != nil && h.items[key].callback != nil { + h.items[key].callback.stopCh <- struct{}{} + } + + delete(h.items, key) +} + +func (h *hmap) removeEntry(key string) { + h.mu.Lock() + defer h.mu.Unlock() + + delete(h.items, key) +} diff --git a/tests/configs/.rr-in-memory-memory.yaml b/tests/configs/.rr-in-memory-memory.yaml new file mode 100644 index 0000000..a01c5d1 --- /dev/null +++ b/tests/configs/.rr-in-memory-memory.yaml @@ -0,0 +1,13 @@ +version: "3" + +rpc: + listen: tcp://127.0.0.1:6666 + +logs: + mode: development + level: error + +kv: + memory-rr: + driver: memory + config: {} diff --git a/tests/configs/.rr-in-memory.yaml b/tests/configs/.rr-in-memory.yaml index cce1d27..057724b 100644 --- a/tests/configs/.rr-in-memory.yaml +++ b/tests/configs/.rr-in-memory.yaml @@ -1,4 +1,4 @@ -version: '3' +version: "3" rpc: listen: tcp://127.0.0.1:6001 @@ -7,13 +7,6 @@ logs: mode: development level: debug -otel: - insecure: false - compress: true - exporter: stderr - service_name: rr_test_inmemory - service_version: 1.0.0 - kv: memory-rr: driver: memory diff --git a/tests/kv_memory_test.go b/tests/kv_memory_test.go index e282569..651b335 100644 --- a/tests/kv_memory_test.go +++ b/tests/kv_memory_test.go @@ -6,6 +6,7 @@ import ( "net/rpc" "os" "os/signal" + "runtime" "sync" "syscall" "testing" @@ -23,6 +24,7 @@ import ( rpcPlugin "github.com/roadrunner-server/rpc/v5" "github.com/roadrunner-server/server/v5" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestInMemoryOrder(t *testing.T) { @@ -92,6 +94,115 @@ func TestInMemoryOrder(t *testing.T) { wg.Wait() } +func TestSetManyMemory(t *testing.T) { + t.Skip("This test is executed manually") + cont := endure.New(slog.LevelDebug) + + cfg := &config.Plugin{ + Version: "2024.2.0", + Path: "configs/.rr-in-memory-memory.yaml", + } + + err := cont.RegisterAll( + cfg, + &kv.Plugin{}, + &memory.Plugin{}, + &otel.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + + conn, err := net.Dial("tcp", "127.0.0.1:6666") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + tt := time.Now().Add(time.Minute * 10).Format(time.RFC3339) + data := &kvProto.Request{ + Storage: "memory-rr", + Items: []*kvProto.Item{ + { + Key: "a", + Value: []byte("aa"), + Timeout: tt, + }, + { + Key: "b", + Value: []byte("bb"), + Timeout: tt, + }, + { + Key: "c", + Value: []byte("cc"), + Timeout: tt, + }, + { + Key: "d", + Value: []byte("dd"), + Timeout: tt, + }, + }, + } + + ret := &kvProto.Response{} + for i := 0; i < 1000000; i++ { + t.Log("Number Goroutines: ", runtime.NumGoroutine()) + err = client.Call("kv.Set", data, ret) + require.NoError(t, err) + } + + t.Log("sleeping") + time.Sleep(time.Minute * 1) + + stopCh <- struct{}{} + wg.Wait() +} + func TestInMemory(t *testing.T) { cont := endure.New(slog.LevelDebug) From e48d0e3518cba4390b8838c6e9626667b9ca4452 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 19 Nov 2024 03:23:45 +0100 Subject: [PATCH 2/7] fix: properly check the callback for nil Signed-off-by: Valery Piashchynski --- memorykv/kv.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/memorykv/kv.go b/memorykv/kv.go index 4681f8a..99e8b29 100644 --- a/memorykv/kv.go +++ b/memorykv/kv.go @@ -195,6 +195,7 @@ func (d *Driver) Set(items ...kv.Item) error { // create callback to delete the key from the heap stopCh, updateCh := d.ttlcallback(items[i].Key(), tm, *d.broadcastStopCh.Load()) + d.log.Debug("saving item with TTL", zap.String("key", items[i].Key()), zap.String("ttl", items[i].Timeout())) d.heap.Set(items[i].Key(), &Item{ key: items[i].Key(), value: items[i].Value(), @@ -247,13 +248,18 @@ func (d *Driver) MExpire(items ...kv.Item) error { ttm = 0 } - if clb, ok := d.heap.Get(items[i].Key()); ok { + // check if the key exists and has a callback + if clb, ok := d.heap.Get(items[i].Key()); ok && clb.callback != nil { // send new ttl to the callback clb.callback.updateCh <- ttm + // if not -> set the callback } else { // we should set the callback // create callback to delete the key from the heap stopCh, updateCh := d.ttlcallback(items[i].Key(), ttm, *d.broadcastStopCh.Load()) + // just to be sure + d.heap.removeEntry(items[i].Key()) + // set the item with the callback d.heap.Set(items[i].Key(), &Item{ key: items[i].Key(), value: items[i].Value(), From 48ae9f697a76e60de19fd34008865210a0450ea2 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 19 Nov 2024 03:25:19 +0100 Subject: [PATCH 3/7] chore: add some docs Signed-off-by: Valery Piashchynski --- memorykv/map.go | 1 + 1 file changed, 1 insertion(+) diff --git a/memorykv/map.go b/memorykv/map.go index bc798e9..7c5daec 100644 --- a/memorykv/map.go +++ b/memorykv/map.go @@ -71,6 +71,7 @@ func (h *hmap) Delete(key string) { delete(h.items, key) } +// IMPORTANT: Only use this method when the callback has already been cleaned up. func (h *hmap) removeEntry(key string) { h.mu.Lock() defer h.mu.Unlock() From 881bb6ef3215a3baaa5a78f29d7379e81d253999 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 19 Nov 2024 03:29:06 +0100 Subject: [PATCH 4/7] chore: prevent blocking on channels Signed-off-by: Valery Piashchynski --- memorykv/map.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/memorykv/map.go b/memorykv/map.go index 7c5daec..00c44ef 100644 --- a/memorykv/map.go +++ b/memorykv/map.go @@ -37,7 +37,10 @@ func (h *hmap) LoadAndDelete(key string) (*Item, bool) { item, ok := h.items[key] if ok { if item.callback != nil { - item.callback.stopCh <- struct{}{} + select { + case item.callback.stopCh <- struct{}{}: + default: + } } delete(h.items, key) } @@ -52,12 +55,17 @@ func (h *hmap) Clean() { for k, v := range h.items { if v != nil { if v.callback != nil { - v.callback.stopCh <- struct{}{} + select { + case v.callback.stopCh <- struct{}{}: + default: + } } } delete(h.items, k) } + + h.items = make(map[string]*Item, 10) } func (h *hmap) Delete(key string) { @@ -65,7 +73,10 @@ func (h *hmap) Delete(key string) { defer h.mu.Unlock() if h.items[key] != nil && h.items[key].callback != nil { - h.items[key].callback.stopCh <- struct{}{} + select { + case h.items[key].callback.stopCh <- struct{}{}: + default: + } } delete(h.items, key) From 1b8af582c13d3354df17f6a5fe3b619cbc62a4ff Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 19 Nov 2024 03:30:13 +0100 Subject: [PATCH 5/7] chore: redundand code Signed-off-by: Valery Piashchynski --- memorykv/kv.go | 1 - 1 file changed, 1 deletion(-) diff --git a/memorykv/kv.go b/memorykv/kv.go index 99e8b29..dc66172 100644 --- a/memorykv/kv.go +++ b/memorykv/kv.go @@ -346,7 +346,6 @@ func (d *Driver) Clear() error { newCh := make(chan struct{}) d.broadcastStopCh.Swap(&newCh) d.heap.Clean() - d.heap = newHMap() return nil } From 9b6e1dfbb9ac942238259a80e8fdfe217a31a935 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 19 Nov 2024 04:00:43 +0100 Subject: [PATCH 6/7] chore: add memory and goroutine leak test Signed-off-by: Valery Piashchynski --- tests/kv_memory_test.go | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/tests/kv_memory_test.go b/tests/kv_memory_test.go index 651b335..44ec00d 100644 --- a/tests/kv_memory_test.go +++ b/tests/kv_memory_test.go @@ -92,10 +92,13 @@ func TestInMemoryOrder(t *testing.T) { time.Sleep(time.Second * 1) stopCh <- struct{}{} wg.Wait() + + t.Cleanup(func() { + + }) } func TestSetManyMemory(t *testing.T) { - t.Skip("This test is executed manually") cont := endure.New(slog.LevelDebug) cfg := &config.Plugin{ @@ -158,6 +161,11 @@ func TestSetManyMemory(t *testing.T) { time.Sleep(time.Second * 1) + ms := &runtime.MemStats{} + runtime.ReadMemStats(ms) + prevAlloc := ms.Alloc + ngprev := runtime.NumGoroutine() + conn, err := net.Dial("tcp", "127.0.0.1:6666") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) @@ -190,14 +198,33 @@ func TestSetManyMemory(t *testing.T) { } ret := &kvProto.Response{} - for i := 0; i < 1000000; i++ { - t.Log("Number Goroutines: ", runtime.NumGoroutine()) + for i := 0; i < 10000; i++ { err = client.Call("kv.Set", data, ret) require.NoError(t, err) } + runtime.GC() + + ms = &runtime.MemStats{} + runtime.ReadMemStats(ms) + currAlloc := ms.Alloc + currNg := runtime.NumGoroutine() + + if currAlloc-prevAlloc > 10000000 { // 10MB + t.Log("Prev alloc", prevAlloc) + t.Log("Curr alloc", currAlloc) + t.Error("Memory leak detected") + } + + if currNg-ngprev > 10 { + t.Log("Prev ng", ngprev) + t.Log("Curr ng", currNg) + t.Error("Goroutine leak detected") + } - t.Log("sleeping") - time.Sleep(time.Minute * 1) + time.Sleep(time.Second * 5) + + err = client.Call("kv.Clear", data, ret) + require.NoError(t, err) stopCh <- struct{}{} wg.Wait() @@ -461,4 +488,7 @@ func testRPCMethodsInMemory(t *testing.T) { err = client.Call("kv.Has", dataClear, ret) assert.NoError(t, err) assert.Len(t, ret.GetItems(), 0) // should be 5 + + err = client.Call("kv.Clear", data, ret) + require.NoError(t, err) } From 66a08bd22ba6f905273a39ce53c3ba79d97b6431 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 19 Nov 2024 13:54:09 +0100 Subject: [PATCH 7/7] chore: close the connection Signed-off-by: Valery Piashchynski --- tests/kv_memory_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/kv_memory_test.go b/tests/kv_memory_test.go index 44ec00d..240219b 100644 --- a/tests/kv_memory_test.go +++ b/tests/kv_memory_test.go @@ -169,6 +169,9 @@ func TestSetManyMemory(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6666") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + defer func() { + _ = client.Close() + }() tt := time.Now().Add(time.Minute * 10).Format(time.RFC3339) data := &kvProto.Request{