Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!(requestmanager): remove request allocation backpressure #272

Merged
merged 3 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 1 addition & 35 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,11 @@ type GraphSync struct {
ctx context.Context
cancel context.CancelFunc
responseAllocator *allocator.Allocator
requestAllocator *allocator.Allocator
}

type graphsyncConfigOptions struct {
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressIncomingRequests uint64
maxInProgressIncomingRequestsPerPeer uint64
maxInProgressOutgoingRequests uint64
Expand Down Expand Up @@ -116,22 +113,6 @@ func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
}
}

// MaxMemoryRequestor defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
func MaxMemoryRequestor(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemoryRequestor = totalMaxMemory
}
}

// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer
}
}

// MaxInProgressIncomingRequests changes the maximum number of
// incoming graphsync requests that are processed in parallel (default 6)
func MaxInProgressIncomingRequests(maxInProgressIncomingRequests uint64) Option {
Expand Down Expand Up @@ -214,8 +195,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
gsConfig := &graphsyncConfigOptions{
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressIncomingRequests: defaultMaxInProgressRequests,
maxInProgressOutgoingRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
Expand Down Expand Up @@ -247,9 +226,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
return messagequeue.New(ctx, p, network, responseAllocator, gsConfig.messageSendRetries, gsConfig.sendMessageTimeout)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
asyncLoader := asyncloader.New(ctx, linkSystem)
requestQueue := taskqueue.NewTaskQueue(ctx)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, outgoingRequestProcessingListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
Expand Down Expand Up @@ -313,7 +291,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx: ctx,
cancel: cancel,
responseAllocator: responseAllocator,
requestAllocator: requestAllocator,
}

requestManager.SetDelegate(peerManager)
Expand Down Expand Up @@ -453,7 +430,6 @@ func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.Requ
// Stats produces insight on the current state of a graphsync exchange
func (gs *GraphSync) Stats() graphsync.Stats {
outgoingRequestStats := gs.requestQueue.Stats()
incomingResponseStats := gs.requestAllocator.Stats()

ptqstats := gs.peerTaskQueue.Stats()
incomingRequestStats := graphsync.RequestStats{
Expand All @@ -465,8 +441,6 @@ func (gs *GraphSync) Stats() graphsync.Stats {

return graphsync.Stats{
OutgoingRequests: outgoingRequestStats,
IncomingResponses: incomingResponseStats,

IncomingRequests: incomingRequestStats,
OutgoingResponses: outgoingResponseStats,
}
Expand All @@ -485,14 +459,6 @@ func (gsr *graphSyncReceiver) ReceiveMessage(
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
totalMemoryAllocated := uint64(0)
for _, blk := range incoming.Blocks() {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-gsr.graphSync().requestAllocator.AllocateBlockMemory(sender, totalMemoryAllocated):
case <-gsr.ctx.Done():
}
gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

Expand Down
34 changes: 7 additions & 27 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

Expand All @@ -20,24 +19,16 @@ import (
"github.com/ipfs/go-graphsync/requestmanager/types"
)

var log = logging.Logger("gs-asyncloader")

type alternateQueue struct {
responseCache *responsecache.ResponseCache
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

// Allocator indicates a mechanism for tracking memory used by a given peer
type Allocator interface {
ReleaseBlockMemory(p peer.ID, amount uint64) error
}

// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
type AsyncLoader struct {
ctx context.Context
cancel context.CancelFunc
allocator Allocator
ctx context.Context
cancel context.CancelFunc

// this mutex protects access to the state of the async loader, which covers all data fields below below
stateLk sync.Mutex
Expand All @@ -50,8 +41,8 @@ type AsyncLoader struct {

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem, allocator)
func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem)
ctx, cancel := context.WithCancel(ctx)
return &AsyncLoader{
ctx: ctx,
Expand All @@ -61,7 +52,6 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *
alternateQueues: make(map[string]alternateQueue),
responseCache: responseCache,
loadAttemptQueue: loadAttemptQueue,
allocator: allocator,
}
}

Expand All @@ -73,7 +63,7 @@ func (al *AsyncLoader) RegisterPersistenceOption(name string, lsys ipld.LinkSyst
if existing {
return errors.New("already registerd a persistence option with this name")
}
responseCache, loadAttemptQueue := setupAttemptQueue(lsys, al.allocator)
responseCache, loadAttemptQueue := setupAttemptQueue(lsys)
al.alternateQueues[name] = alternateQueue{responseCache, loadAttemptQueue}
return nil
}
Expand Down Expand Up @@ -170,13 +160,7 @@ func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID)
responseCache = al.alternateQueues[aq].responseCache
delete(al.requestQueues, requestID)
}
toFree := responseCache.FinishRequest(requestID)
if toFree > 0 {
err := al.allocator.ReleaseBlockMemory(p, toFree)
if err != nil {
log.Infow("Error deallocating requestor memory", "p", p, "toFree", toFree, "err", err)
}
}
responseCache.FinishRequest(requestID)
}

func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
Expand All @@ -193,7 +177,7 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac
return al.alternateQueues[queue].responseCache
}

func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
responseCache := responsecache.New(unverifiedBlockStore)
Expand All @@ -204,10 +188,6 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach
return types.AsyncLoadResult{Err: err, Local: false}
}
if data != nil {
err = allocator.ReleaseBlockMemory(p, uint64(len(data)))
if err != nil {
log.Warningf("releasing block memory: %s", err.Error())
}
return types.AsyncLoadResult{Data: data, Local: false}
}
// fall back to local store
Expand Down
4 changes: 1 addition & 3 deletions requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/allocator"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipfs/go-graphsync/testutil"
Expand Down Expand Up @@ -385,8 +384,7 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
allocator := allocator.NewAllocator(256*(1<<20), 16*(1<<20))
asyncLoader := New(ctx, st.lsys, allocator)
asyncLoader := New(ctx, st.lsys)
exec(ctx, asyncLoader)
}

Expand Down
10 changes: 2 additions & 8 deletions requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,14 @@ func New(unverifiedBlockStore UnverifiedBlockStore) *ResponseCache {
// FinishRequest indicate there is no more need to track blocks tied to this
// response. It returns the total number of bytes in blocks that were being
// tracked but are no longer in memory
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) uint64 {
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) {
rc.responseCacheLk.Lock()
rc.linkTracker.FinishRequest(requestID)

toFree := uint64(0)
rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link, amt uint64) bool {
shouldPrune := rc.linkTracker.BlockRefCount(link) == 0
if shouldPrune {
toFree += amt
}
return shouldPrune
return rc.linkTracker.BlockRefCount(link) == 0
})
rc.responseCacheLk.Unlock()
return toFree
}

// AttemptLoad attempts to laod the given block from the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,13 @@ func TestResponseCacheManagingLinks(t *testing.T) {
require.NoError(t, err)
require.Nil(t, data, "no data should be returned for unknown block")

toFree := responseCache.FinishRequest(requestID1)
responseCache.FinishRequest(requestID1)
// should remove only block 0, since it now has no refering outstanding requests
require.Len(t, fubs.blocks(), len(blks)-4, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[0])
require.Equal(t, toFree, uint64(len(blks[0].RawData())))

responseCache.FinishRequest(requestID2)
// should remove last block since are no remaining references
require.Len(t, fubs.blocks(), 0, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[3])
require.Equal(t, toFree, uint64(len(blks[3].RawData())))

}