Skip to content

Commit

Permalink
Merge final v0.6.x commit history, and 0.8.0 changelog (#205)
Browse files Browse the repository at this point in the history
* docs(CHANGELOG): update for v0.8.0

* log in-memory usage

* fix log name

* record in-memory bs size

* fix calc

* Apply suggestions from code review

Co-authored-by: raulk <raul@protocol.ai>

* feat(asyncloader): memory pressure incoming responses

Block reading incoming responses to avoid memory pressure buildup

* feat(testplans): add car store support for faster testing

* fix

* fix(testplan): fix file buffer ds setup

* fix(asyncloader): cleanup load logic for async loader

* docs(CHANGELOG): update for 0.6.9 release

also cleanup a couple style issues

* style(lint): fix lint errs

Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: raulk <raul@protocol.ai>
  • Loading branch information
3 people authored Aug 26, 2021
1 parent d508b67 commit 41364ef
Show file tree
Hide file tree
Showing 21 changed files with 864 additions and 181 deletions.
43 changes: 43 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
# go-graphsync changelog

# go-graphsync 0.8.0

This release updates to the v0.9.0 branch of go-ipld-prime and adds a "trusted store" optimization that may produce important speed improvements.

It also includes several improvements to the internal testplan & updated
architecture docs.

### Changelog

- github.com/ipfs/go-graphsync:
- Update for LinkSystem (#161) ([ipfs/go-graphsync#161](https://github.com/ipfs/go-graphsync/pull/161))
- Round out diagnostic parameters (#157) ([ipfs/go-graphsync#157](https://github.com/ipfs/go-graphsync/pull/157))
- map response codes to names (#148) ([ipfs/go-graphsync#148](https://github.com/ipfs/go-graphsync/pull/148))
- Discard http output (#156) ([ipfs/go-graphsync#156](https://github.com/ipfs/go-graphsync/pull/156))
- Add debug logging (#121) ([ipfs/go-graphsync#121](https://github.com/ipfs/go-graphsync/pull/121))
- Add optional HTTP comparison (#153) ([ipfs/go-graphsync#153](https://github.com/ipfs/go-graphsync/pull/153))
- docs(architecture): update architecture docs (#154) ([ipfs/go-graphsync#154](https://github.com/ipfs/go-graphsync/pull/154))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 5 | +885/-598 | 55 |
| dirkmc | 1 | +79/-50 | 2 |
| Aarsh Shah | 1 | +2/-6 | 2 |

# go-graphsync 0.7.0

This is a small release to update some dependencies. Importantly, it pulls in go-ipld-prime with
Expand All @@ -20,6 +46,23 @@ some significant breaking changes.
| Hannah Howard | 2 | +3316/-3015 | 25 |
| Steven Allen | 1 | +95/-227 | 5 |

# go-graphsync 0.6.9

This release adds additional log statements and addresses a memory performance bug on the requesting side when making lots of outgoing requests at once

### Changelog

- github.com/ipfs/go-graphsync:
- Back pressure incoming responses ([ipfs/go-graphsync#204](https://github.com/ipfs/go-graphsync/pull/204))
- Log unverified blockstore memory consumption ([ipfs/go-graphsync#201](https://github.com/ipfs/go-graphsync/pull/201))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| hannahhoward | 5 | +1535/-381 | 25 |
| Aarsh Shah | 5 | +27/-17 | 5 |

# go-graphsync 0.6.8

### Changelog
Expand Down
20 changes: 20 additions & 0 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure-missing-blocks", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesMissingTopLevelBlock(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
Expand Down Expand Up @@ -302,6 +305,23 @@ func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLeve
}
}

func allFilesMissingTopLevelBlock(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel, useRawNodes)
ds := merkledag.NewDAGService(blockservice.New(prov.BlockStore, offline.Exchange(prov.BlockStore)))
lnks, err := ds.GetLinks(ctx, c)
require.NoError(b, err)
randLink := lnks[rand.Intn(len(lnks))]
err = ds.Remove(ctx, randLink.Cid)
require.NoError(b, err)
cids = append(cids, c)
}
return cids
}
}

type tempDirMaker struct {
tdm string
tempDirSeq int32
Expand Down
50 changes: 36 additions & 14 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ type GraphSync struct {
}

type graphsyncConfigOptions struct {
totalMaxMemory uint64
maxMemoryPerPeer uint64
maxInProgressRequests uint64
registerDefaultValidator bool
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressRequests uint64
registerDefaultValidator bool
}

// Option defines the functional option type that can be used to configure
Expand All @@ -84,15 +86,31 @@ func RejectAllRequestsByDefault() Option {
// may consume queueing up messages for a response in total
func MaxMemoryResponder(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemory = totalMaxMemory
gs.totalMaxMemoryResponder = totalMaxMemory
}
}

// MaxMemoryPerPeerResponder defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeer = maxMemoryPerPeer
gs.maxMemoryPerPeerResponder = maxMemoryPerPeer
}
}

// MaxMemoryRequestor defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
func MaxMemoryRequestor(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemoryRequestor = totalMaxMemory
}
}

// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer
}
}

Expand All @@ -111,10 +129,12 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx, cancel := context.WithCancel(parent)

gsConfig := &graphsyncConfigOptions{
totalMaxMemory: defaultTotalMaxMemory,
maxMemoryPerPeer: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
}
for _, option := range options {
option(gsConfig)
Expand All @@ -135,12 +155,14 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
if gsConfig.registerDefaultValidator {
incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
}
allocator := allocator.NewAllocator(gsConfig.totalMaxMemory, gsConfig.maxMemoryPerPeer)
responseAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryResponder, gsConfig.maxMemoryPerPeerResponder)
createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
return messagequeue.New(ctx, p, network, allocator)
return messagequeue.New(ctx, p, network, responseAllocator)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, linkSystem)
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
Expand Down Expand Up @@ -169,7 +191,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
persistenceOptions: persistenceOptions,
ctx: ctx,
cancel: cancel,
allocator: allocator,
allocator: responseAllocator,
}

asyncLoader.Startup()
Expand Down
5 changes: 3 additions & 2 deletions message/pb/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 42 additions & 24 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"io/ioutil"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
Expand All @@ -16,6 +18,8 @@ import (
"github.com/ipfs/go-graphsync/requestmanager/types"
)

var log = logging.Logger("gs-asyncloader")

type loaderMessage interface {
handle(al *AsyncLoader)
}
Expand All @@ -25,6 +29,12 @@ type alternateQueue struct {
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

// Allocator indicates a mechanism for tracking memory used by a given peer
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleaseBlockMemory(p peer.ID, amount uint64) error
}

// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
type AsyncLoader struct {
Expand All @@ -39,12 +49,13 @@ type AsyncLoader struct {
alternateQueues map[string]alternateQueue
responseCache *responsecache.ResponseCache
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
allocator Allocator
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem)
func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem, allocator)
ctx, cancel := context.WithCancel(ctx)
return &AsyncLoader{
ctx: ctx,
Expand All @@ -57,6 +68,7 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader {
alternateQueues: make(map[string]alternateQueue),
responseCache: responseCache,
loadAttemptQueue: loadAttemptQueue,
allocator: allocator,
}
}

Expand Down Expand Up @@ -101,8 +113,16 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp

// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
totalMemoryAllocated := uint64(0)
for _, blk := range blks {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated):
case <-al.ctx.Done():
}
select {
case <-al.ctx.Done():
case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
Expand All @@ -111,10 +131,10 @@ func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadat

// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
resultChan := make(chan types.AsyncLoadResult, 1)
response := make(chan error, 1)
lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
lr := loadattemptqueue.NewLoadRequest(p, requestID, link, resultChan)
_ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response)
return resultChan
}
Expand Down Expand Up @@ -255,7 +275,7 @@ func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
if existing {
return errors.New("already registerd a persistence option with this name")
}
responseCache, loadAttemptQueue := setupAttemptQueue(rpom.linkSystem)
responseCache, loadAttemptQueue := setupAttemptQueue(rpom.linkSystem, al.allocator)
al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue}
return nil
}
Expand Down Expand Up @@ -344,32 +364,30 @@ func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
al.responseCache.FinishRequest(crm.requestID)
}

func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link)
if data == nil && err == nil {
// fall back to local store
stream, loadErr := lsys.StorageReadOpener(ipld.LinkContext{}, link)
if stream != nil && loadErr == nil {
localData, loadErr := ioutil.ReadAll(stream)
if loadErr == nil && localData != nil {
return types.AsyncLoadResult{
Data: localData,
Err: nil,
Local: true,
}
}
if err != nil {
return types.AsyncLoadResult{Err: err, Local: false}
}
if data != nil {
err = allocator.ReleaseBlockMemory(p, uint64(len(data)))
if err != nil {
log.Warningf("releasing block memory: %s", err.Error())
}
return types.AsyncLoadResult{Data: data, Local: false}
}
return types.AsyncLoadResult{
Data: data,
Err: err,
Local: false,
// fall back to local store
if stream, err := lsys.StorageReadOpener(ipld.LinkContext{}, link); stream != nil && err == nil {
if localData, err := ioutil.ReadAll(stream); err == nil && localData != nil {
return types.AsyncLoadResult{Data: localData, Local: true}
}
}
return types.AsyncLoadResult{Local: false}
})

return responseCache, loadAttemptQueue
Expand Down
Loading

0 comments on commit 41364ef

Please sign in to comment.