From 9bb67b49a7de4c201098977432d29007b469454d Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 13:18:55 -0800 Subject: [PATCH 1/8] rename mutex map files --- utils/{priority_mutex_map.go => mutex_map.go} | 0 utils/{priority_mutex_map_test.go => mutex_map_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename utils/{priority_mutex_map.go => mutex_map.go} (100%) rename utils/{priority_mutex_map_test.go => mutex_map_test.go} (100%) diff --git a/utils/priority_mutex_map.go b/utils/mutex_map.go similarity index 100% rename from utils/priority_mutex_map.go rename to utils/mutex_map.go diff --git a/utils/priority_mutex_map_test.go b/utils/mutex_map_test.go similarity index 100% rename from utils/priority_mutex_map_test.go rename to utils/mutex_map_test.go From 9002be3c60c9dbc351dabc5d39635c8552e214fe Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 13:35:42 -0800 Subject: [PATCH 2/8] first pass at sharded map --- go.mod | 1 + go.sum | 2 ++ utils/sharded_map.go | 71 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+) create mode 100644 utils/sharded_map.go diff --git a/go.mod b/go.mod index 076a3394..0299eeca 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77 github.com/mitchellh/mapstructure v1.3.3 github.com/pkg/errors v0.9.1 // indirect + github.com/segmentio/fasthash v1.0.3 github.com/stretchr/objx v0.1.1 // indirect github.com/stretchr/testify v1.6.1 github.com/tidwall/gjson v1.6.3 diff --git a/go.sum b/go.sum index 152d070d..a5b7b9f5 100644 --- a/go.sum +++ b/go.sum @@ -226,6 +226,8 @@ github.com/rs/cors v0.0.0-20160617231935-a62a804a8a00/go.mod h1:gFx+x8UowdsKA9Ac github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521/go.mod h1:RvLn4FgxWubrpZHtQLnOf6EwhN2hEMusxZOhcW9H3UQ= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= +github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/shirou/gopsutil v2.20.5+incompatible h1:tYH07UPoQt0OCQdgWWMgYHy3/a9bcxNpBIysykNIP7I= github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= diff --git a/utils/sharded_map.go b/utils/sharded_map.go new file mode 100644 index 00000000..16464cec --- /dev/null +++ b/utils/sharded_map.go @@ -0,0 +1,71 @@ +package utils + +import ( + "github.com/segmentio/fasthash/fnv1a" +) + +const ( + // DefaultShards is the default number of shards + // to use in ShardedMap. + DefaultShards = 256 +) + +// shardMapEntry governs access to the shard of +// the map contained at a particular index. +type shardMapEntry struct { + mutex *PriorityMutex + entries map[string]interface{} +} + +// ShardedMap allows concurrent writes +// to a map by sharding the map into some +// number of independently locked subsections. +type ShardedMap struct { + shards []*shardMapEntry +} + +// NewShardedMap creates a new *ShardedMap +// with some number of shards. The larger the +// number provided for shards, the less lock +// contention there will be. +// +// As a rule of thumb, shards should usually +// be set to the concurrency of the caller. +func NewShardedMap(shards int) *ShardedMap { + m := &ShardedMap{ + shards: make([]*shardMapEntry, shards), + } + + for i := 0; i < shards; i++ { + m.shards[i] = &shardMapEntry{ + entries: map[string]interface{}{}, + mutex: new(PriorityMutex), + } + } + + return m +} + +// shardIndex returns the index of the shard +// that could contain the key. +func (m *ShardedMap) shardIndex(key string) int { + return int(fnv1a.HashString32(key) % uint32(len(m.shards))) +} + +// Lock acquires the lock for a shard that could contain +// the key. This syntax allows the caller to perform multiple +// operations while holding the lock for a single shard. +func (m *ShardedMap) Lock(key string, priority bool) map[string]interface{} { + shardIndex := m.shardIndex(key) + shard := m.shards[shardIndex] + shard.mutex.Lock(priority) + return shard.entries +} + +// Unlock releases the lock for a shard that could contain +// the key. +func (m *ShardedMap) Unlock(key string) { + shardIndex := m.shardIndex(key) + shard := m.shards[shardIndex] + shard.mutex.Unlock() +} From 787e772df1c8112846e239ac4ba8b9e9ebc5beaa Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 13:47:33 -0800 Subject: [PATCH 3/8] Pass test --- utils/sharded_map_test.go | 55 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 utils/sharded_map_test.go diff --git a/utils/sharded_map_test.go b/utils/sharded_map_test.go new file mode 100644 index 00000000..b10a705a --- /dev/null +++ b/utils/sharded_map_test.go @@ -0,0 +1,55 @@ +package utils + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" +) + +func TestShardedMap(t *testing.T) { + m := NewShardedMap(2) + g, _ := errgroup.WithContext(context.Background()) + + // To test locking, we use channels + // that will cause deadlock if not executed + // concurrently. + a := make(chan struct{}) + b := make(chan struct{}) + + g.Go(func() error { + s := m.Lock("a", false) + assert.Len(t, s, 0) + s["test"] = "a" + <-a + close(b) + m.Unlock("a") + return nil + }) + + g.Go(func() error { + s := m.Lock("b", false) + assert.Len(t, s, 0) + s["test"] = "b" + close(a) + <-b + m.Unlock("b") + return nil + }) + + time.Sleep(1 * time.Second) + assert.NoError(t, g.Wait()) + + // Ensure keys set correctly + s := m.Lock("a", false) + assert.Len(t, s, 1) + assert.Equal(t, s["test"], "a") + m.Unlock("a") + + s = m.Lock("b", false) + assert.Len(t, s, 1) + assert.Equal(t, s["test"], "b") + m.Unlock("b") +} From 7c712115763f69ed78abf2e3f43fb9645a67f285 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 13:48:09 -0800 Subject: [PATCH 4/8] add license --- utils/sharded_map.go | 14 ++++++++++++++ utils/sharded_map_test.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/utils/sharded_map.go b/utils/sharded_map.go index 16464cec..a2d91c02 100644 --- a/utils/sharded_map.go +++ b/utils/sharded_map.go @@ -1,3 +1,17 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package utils import ( diff --git a/utils/sharded_map_test.go b/utils/sharded_map_test.go index b10a705a..a9815bd1 100644 --- a/utils/sharded_map_test.go +++ b/utils/sharded_map_test.go @@ -1,3 +1,17 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package utils import ( From 37f3933f4a8a18d58ddadd8ed064de23b280a72e Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 14:28:38 -0800 Subject: [PATCH 5/8] Migrate mutex map to shared map --- utils/mutex_map.go | 36 +++++++++++++++++++++--------------- utils/mutex_map_test.go | 20 +++++++++++++++----- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/utils/mutex_map.go b/utils/mutex_map.go index c0c0a704..4def4e34 100644 --- a/utils/mutex_map.go +++ b/utils/mutex_map.go @@ -18,6 +18,10 @@ import ( "sync" ) +const ( + unlockPriority = true +) + // MutexMap is a struct that allows for // acquiring a *PriorityMutex via a string identifier // or for acquiring a global mutex that blocks @@ -26,8 +30,7 @@ import ( // This is useful for coordinating concurrent, non-overlapping // writes in the storage package. type MutexMap struct { - entries map[string]*mutexMapEntry - mutex sync.Mutex + entries *ShardedMap globalMutex sync.RWMutex } @@ -39,9 +42,9 @@ type mutexMapEntry struct { } // NewMutexMap returns a new *MutexMap. -func NewMutexMap() *MutexMap { +func NewMutexMap(shards int) *MutexMap { return &MutexMap{ - entries: map[string]*mutexMapEntry{}, + entries: NewShardedMap(shards), } } @@ -70,20 +73,23 @@ func (m *MutexMap) Lock(identifier string, priority bool) { // We acquire m when adding items to m.table // so that we don't accidentally overwrite // lock created by another goroutine. - m.mutex.Lock() - l, ok := m.entries[identifier] + data := m.entries.Lock(identifier, priority) + raw, ok := data[identifier] + var entry *mutexMapEntry if !ok { - l = &mutexMapEntry{ + entry = &mutexMapEntry{ lock: new(PriorityMutex), } - m.entries[identifier] = l + data[identifier] = entry + } else { + entry = raw.(*mutexMapEntry) } - l.count++ - m.mutex.Unlock() + entry.count++ + m.entries.Unlock(identifier) // Once we have a m.globalMutex.RLock, it is // safe to acquire an identifier lock. - l.lock.Lock(priority) + entry.lock.Lock(priority) } // Unlock releases a lock held for a particular identifier. @@ -92,15 +98,15 @@ func (m *MutexMap) Unlock(identifier string) { // exist by the time we unlock, otherwise // it would not have been possible to get // the lock to begin with. - m.mutex.Lock() - entry := m.entries[identifier] + data := m.entries.Lock(identifier, unlockPriority) + entry := data[identifier].(*mutexMapEntry) if entry.count <= 1 { // this should never be < 0 - delete(m.entries, identifier) + delete(data, identifier) } else { entry.count-- entry.lock.Unlock() } - m.mutex.Unlock() + m.entries.Unlock(identifier) // We release the globalMutex after unlocking // the identifier lock, otherwise it would be possible diff --git a/utils/mutex_map_test.go b/utils/mutex_map_test.go index aec5da6b..b3cd9931 100644 --- a/utils/mutex_map_test.go +++ b/utils/mutex_map_test.go @@ -25,7 +25,7 @@ import ( func TestMutexMap(t *testing.T) { arr := []string{} - m := NewMutexMap() + m := NewMutexMap(DefaultShards) g, _ := errgroup.WithContext(context.Background()) // Lock while adding all locks @@ -47,7 +47,8 @@ func TestMutexMap(t *testing.T) { g.Go(func() error { m.Lock("a", false) - assert.Equal(t, m.entries["a"].count, 1) + entry := m.entries.shards[m.entries.shardIndex("a")].entries["a"].(*mutexMapEntry) + assert.Equal(t, entry.count, 1) <-a arr = append(arr, "a") close(b) @@ -57,7 +58,8 @@ func TestMutexMap(t *testing.T) { g.Go(func() error { m.Lock("b", false) - assert.Equal(t, m.entries["b"].count, 1) + entry := m.entries.shards[m.entries.shardIndex("b")].entries["b"].(*mutexMapEntry) + assert.Equal(t, entry.count, 1) close(a) <-b arr = append(arr, "b") @@ -68,7 +70,11 @@ func TestMutexMap(t *testing.T) { time.Sleep(1 * time.Second) // Ensure number of expected locks is correct - assert.Len(t, m.entries, 0) + assert.Equal( + t, + len(m.entries.shards[m.entries.shardIndex("a")].entries)+len(m.entries.shards[m.entries.shardIndex("b")].entries), + 0, + ) arr = append(arr, "global-a") m.GUnlock() assert.NoError(t, g.Wait()) @@ -83,5 +89,9 @@ func TestMutexMap(t *testing.T) { }, arr) // Ensure lock is no longer occupied - assert.Len(t, m.entries, 0) + assert.Equal( + t, + len(m.entries.shards[m.entries.shardIndex("a")].entries)+len(m.entries.shards[m.entries.shardIndex("b")].entries), + 0, + ) } From dadd28512dedc819ed531c29b96cdbc0bb755938 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 14:32:52 -0800 Subject: [PATCH 6/8] shorten lines --- utils/mutex_map_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/utils/mutex_map_test.go b/utils/mutex_map_test.go index b3cd9931..fa723afa 100644 --- a/utils/mutex_map_test.go +++ b/utils/mutex_map_test.go @@ -72,7 +72,11 @@ func TestMutexMap(t *testing.T) { // Ensure number of expected locks is correct assert.Equal( t, - len(m.entries.shards[m.entries.shardIndex("a")].entries)+len(m.entries.shards[m.entries.shardIndex("b")].entries), + len( + m.entries.shards[m.entries.shardIndex("a")].entries, + )+len( + m.entries.shards[m.entries.shardIndex("b")].entries, + ), 0, ) arr = append(arr, "global-a") @@ -91,7 +95,11 @@ func TestMutexMap(t *testing.T) { // Ensure lock is no longer occupied assert.Equal( t, - len(m.entries.shards[m.entries.shardIndex("a")].entries)+len(m.entries.shards[m.entries.shardIndex("b")].entries), + len( + m.entries.shards[m.entries.shardIndex("a")].entries, + )+len( + m.entries.shards[m.entries.shardIndex("b")].entries, + ), 0, ) } From 79e92bddb085304585c2365dac3d812ef8c82970 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 14:43:24 -0800 Subject: [PATCH 7/8] Increase limit size --- storage/badger_storage_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/badger_storage_test.go b/storage/badger_storage_test.go index 804552b7..740f69de 100644 --- a/storage/badger_storage_test.go +++ b/storage/badger_storage_test.go @@ -309,7 +309,7 @@ func TestBadgerTrain_Limit(t *testing.T) { namespace, newDir, dictionaryPath, - 10, + 50, []*CompressorEntry{}, ) assert.NoError(t, err) From 7b5cef380657675fa7f744fa4fd233af13464030 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 14:52:34 -0800 Subject: [PATCH 8/8] nits --- utils/mutex_map_test.go | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/utils/mutex_map_test.go b/utils/mutex_map_test.go index fa723afa..5c2a8d38 100644 --- a/utils/mutex_map_test.go +++ b/utils/mutex_map_test.go @@ -70,15 +70,9 @@ func TestMutexMap(t *testing.T) { time.Sleep(1 * time.Second) // Ensure number of expected locks is correct - assert.Equal( - t, - len( - m.entries.shards[m.entries.shardIndex("a")].entries, - )+len( - m.entries.shards[m.entries.shardIndex("b")].entries, - ), - 0, - ) + totalKeys := len(m.entries.shards[m.entries.shardIndex("a")].entries) + + len(m.entries.shards[m.entries.shardIndex("b")].entries) + assert.Equal(t, totalKeys, 0) arr = append(arr, "global-a") m.GUnlock() assert.NoError(t, g.Wait()) @@ -93,13 +87,7 @@ func TestMutexMap(t *testing.T) { }, arr) // Ensure lock is no longer occupied - assert.Equal( - t, - len( - m.entries.shards[m.entries.shardIndex("a")].entries, - )+len( - m.entries.shards[m.entries.shardIndex("b")].entries, - ), - 0, - ) + totalKeys = len(m.entries.shards[m.entries.shardIndex("a")].entries) + + len(m.entries.shards[m.entries.shardIndex("b")].entries) + assert.Equal(t, totalKeys, 0) }