From ed507825532eb9282125b9e3a6e1c51a5750ecc6 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Sat, 20 Nov 2021 07:21:29 +1100 Subject: [PATCH] feat!(requestmanager): remove request allocation backpressure (#272) * feat!(requestmanager): remove request allocation backpressure Closes: https://github.com/ipfs/go-graphsync/issues/241 Ref: 9171ce6bccd62bf4303c12edd07e278713e6da06 * fixup! feat!(requestmanager): remove request allocation backpressure Co-authored-by: Hannah Howard --- impl/graphsync.go | 36 +------------------ requestmanager/asyncloader/asyncloader.go | 34 ++++-------------- .../asyncloader/asyncloader_test.go | 4 +-- .../responsecache/responsecache.go | 10 ++---- .../responsecache/responsecache_test.go | 5 +-- 5 files changed, 12 insertions(+), 77 deletions(-) diff --git a/impl/graphsync.go b/impl/graphsync.go index dcc7ad97..fe26fed1 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -70,14 +70,11 @@ type GraphSync struct { ctx context.Context cancel context.CancelFunc responseAllocator *allocator.Allocator - requestAllocator *allocator.Allocator } type graphsyncConfigOptions struct { totalMaxMemoryResponder uint64 maxMemoryPerPeerResponder uint64 - totalMaxMemoryRequestor uint64 - maxMemoryPerPeerRequestor uint64 maxInProgressIncomingRequests uint64 maxInProgressIncomingRequestsPerPeer uint64 maxInProgressOutgoingRequests uint64 @@ -116,22 +113,6 @@ func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option { } } -// MaxMemoryRequestor defines the maximum amount of memory the responder -// may consume queueing up messages for a response in total -func MaxMemoryRequestor(totalMaxMemory uint64) Option { - return func(gs *graphsyncConfigOptions) { - gs.totalMaxMemoryRequestor = totalMaxMemory - } -} - -// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer -// may consume queueing up messages for a response -func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option { - return func(gs *graphsyncConfigOptions) { - gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer - } -} - // MaxInProgressIncomingRequests changes the maximum number of // incoming graphsync requests that are processed in parallel (default 6) func MaxInProgressIncomingRequests(maxInProgressIncomingRequests uint64) Option { @@ -214,8 +195,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, gsConfig := &graphsyncConfigOptions{ totalMaxMemoryResponder: defaultTotalMaxMemory, maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer, - totalMaxMemoryRequestor: defaultTotalMaxMemory, - maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer, maxInProgressIncomingRequests: defaultMaxInProgressRequests, maxInProgressOutgoingRequests: defaultMaxInProgressRequests, registerDefaultValidator: true, @@ -247,9 +226,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, return messagequeue.New(ctx, p, network, responseAllocator, gsConfig.messageSendRetries, gsConfig.sendMessageTimeout) } peerManager := peermanager.NewMessageManager(ctx, createMessageQueue) - requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor) - asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator) + asyncLoader := asyncloader.New(ctx, linkSystem) requestQueue := taskqueue.NewTaskQueue(ctx) requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, outgoingRequestProcessingListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest) requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad) @@ -313,7 +291,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, ctx: ctx, cancel: cancel, responseAllocator: responseAllocator, - requestAllocator: requestAllocator, } requestManager.SetDelegate(peerManager) @@ -453,7 +430,6 @@ func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.Requ // Stats produces insight on the current state of a graphsync exchange func (gs *GraphSync) Stats() graphsync.Stats { outgoingRequestStats := gs.requestQueue.Stats() - incomingResponseStats := gs.requestAllocator.Stats() ptqstats := gs.peerTaskQueue.Stats() incomingRequestStats := graphsync.RequestStats{ @@ -465,8 +441,6 @@ func (gs *GraphSync) Stats() graphsync.Stats { return graphsync.Stats{ OutgoingRequests: outgoingRequestStats, - IncomingResponses: incomingResponseStats, - IncomingRequests: incomingRequestStats, OutgoingResponses: outgoingResponseStats, } @@ -485,14 +459,6 @@ func (gsr *graphSyncReceiver) ReceiveMessage( sender peer.ID, incoming gsmsg.GraphSyncMessage) { gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests()) - totalMemoryAllocated := uint64(0) - for _, blk := range incoming.Blocks() { - totalMemoryAllocated += uint64(len(blk.RawData())) - } - select { - case <-gsr.graphSync().requestAllocator.AllocateBlockMemory(sender, totalMemoryAllocated): - case <-gsr.ctx.Done(): - } gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks()) } diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index 54a1e9ff..596e3d5a 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -8,7 +8,6 @@ import ( "sync" blocks "github.com/ipfs/go-block-format" - logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" peer "github.com/libp2p/go-libp2p-core/peer" @@ -20,24 +19,16 @@ import ( "github.com/ipfs/go-graphsync/requestmanager/types" ) -var log = logging.Logger("gs-asyncloader") - type alternateQueue struct { responseCache *responsecache.ResponseCache loadAttemptQueue *loadattemptqueue.LoadAttemptQueue } -// Allocator indicates a mechanism for tracking memory used by a given peer -type Allocator interface { - ReleaseBlockMemory(p peer.ID, amount uint64) error -} - // AsyncLoader manages loading links asynchronously in as new responses // come in from the network type AsyncLoader struct { - ctx context.Context - cancel context.CancelFunc - allocator Allocator + ctx context.Context + cancel context.CancelFunc // this mutex protects access to the state of the async loader, which covers all data fields below below stateLk sync.Mutex @@ -50,8 +41,8 @@ type AsyncLoader struct { // New initializes a new link loading manager for asynchronous loads from the given context // and local store loading and storing function -func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *AsyncLoader { - responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem, allocator) +func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader { + responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem) ctx, cancel := context.WithCancel(ctx) return &AsyncLoader{ ctx: ctx, @@ -61,7 +52,6 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) * alternateQueues: make(map[string]alternateQueue), responseCache: responseCache, loadAttemptQueue: loadAttemptQueue, - allocator: allocator, } } @@ -73,7 +63,7 @@ func (al *AsyncLoader) RegisterPersistenceOption(name string, lsys ipld.LinkSyst if existing { return errors.New("already registerd a persistence option with this name") } - responseCache, loadAttemptQueue := setupAttemptQueue(lsys, al.allocator) + responseCache, loadAttemptQueue := setupAttemptQueue(lsys) al.alternateQueues[name] = alternateQueue{responseCache, loadAttemptQueue} return nil } @@ -170,13 +160,7 @@ func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID) responseCache = al.alternateQueues[aq].responseCache delete(al.requestQueues, requestID) } - toFree := responseCache.FinishRequest(requestID) - if toFree > 0 { - err := al.allocator.ReleaseBlockMemory(p, toFree) - if err != nil { - log.Infow("Error deallocating requestor memory", "p", p, "toFree", toFree, "err", err) - } - } + responseCache.FinishRequest(requestID) } func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue { @@ -193,7 +177,7 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac return al.alternateQueues[queue].responseCache } -func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { +func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener) responseCache := responsecache.New(unverifiedBlockStore) @@ -204,10 +188,6 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach return types.AsyncLoadResult{Err: err, Local: false} } if data != nil { - err = allocator.ReleaseBlockMemory(p, uint64(len(data))) - if err != nil { - log.Warningf("releasing block memory: %s", err.Error()) - } return types.AsyncLoadResult{Data: data, Local: false} } // fall back to local store diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index fcde371c..c1f7b858 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/allocator" "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/requestmanager/types" "github.com/ipfs/go-graphsync/testutil" @@ -385,8 +384,7 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - allocator := allocator.NewAllocator(256*(1<<20), 16*(1<<20)) - asyncLoader := New(ctx, st.lsys, allocator) + asyncLoader := New(ctx, st.lsys) exec(ctx, asyncLoader) } diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index d1d0b57b..31877405 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -44,20 +44,14 @@ func New(unverifiedBlockStore UnverifiedBlockStore) *ResponseCache { // FinishRequest indicate there is no more need to track blocks tied to this // response. It returns the total number of bytes in blocks that were being // tracked but are no longer in memory -func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) uint64 { +func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) { rc.responseCacheLk.Lock() rc.linkTracker.FinishRequest(requestID) - toFree := uint64(0) rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link, amt uint64) bool { - shouldPrune := rc.linkTracker.BlockRefCount(link) == 0 - if shouldPrune { - toFree += amt - } - return shouldPrune + return rc.linkTracker.BlockRefCount(link) == 0 }) rc.responseCacheLk.Unlock() - return toFree } // AttemptLoad attempts to laod the given block from the cache diff --git a/requestmanager/asyncloader/responsecache/responsecache_test.go b/requestmanager/asyncloader/responsecache/responsecache_test.go index aecf1aea..22de3563 100644 --- a/requestmanager/asyncloader/responsecache/responsecache_test.go +++ b/requestmanager/asyncloader/responsecache/responsecache_test.go @@ -134,16 +134,13 @@ func TestResponseCacheManagingLinks(t *testing.T) { require.NoError(t, err) require.Nil(t, data, "no data should be returned for unknown block") - toFree := responseCache.FinishRequest(requestID1) + responseCache.FinishRequest(requestID1) // should remove only block 0, since it now has no refering outstanding requests require.Len(t, fubs.blocks(), len(blks)-4, "should prune block when it is orphaned") testutil.RefuteContainsBlock(t, fubs.blocks(), blks[0]) - require.Equal(t, toFree, uint64(len(blks[0].RawData()))) responseCache.FinishRequest(requestID2) // should remove last block since are no remaining references require.Len(t, fubs.blocks(), 0, "should prune block when it is orphaned") testutil.RefuteContainsBlock(t, fubs.blocks(), blks[3]) - require.Equal(t, toFree, uint64(len(blks[3].RawData()))) - }