Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge final v0.6.x commit history, and 0.8.0 changelog #205

Merged
merged 18 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
# go-graphsync changelog

# go-graphsync 0.8.0

This release updates to the v0.9.0 branch of go-ipld-prime and adds a "trusted store" optimization that may produce important speed improvements.

It also includes several improvements to the internal testplan & updated
architecture docs.

### Changelog

- github.com/ipfs/go-graphsync:
- Update for LinkSystem (#161) ([ipfs/go-graphsync#161](https://github.com/ipfs/go-graphsync/pull/161))
- Round out diagnostic parameters (#157) ([ipfs/go-graphsync#157](https://github.com/ipfs/go-graphsync/pull/157))
- map response codes to names (#148) ([ipfs/go-graphsync#148](https://github.com/ipfs/go-graphsync/pull/148))
- Discard http output (#156) ([ipfs/go-graphsync#156](https://github.com/ipfs/go-graphsync/pull/156))
- Add debug logging (#121) ([ipfs/go-graphsync#121](https://github.com/ipfs/go-graphsync/pull/121))
- Add optional HTTP comparison (#153) ([ipfs/go-graphsync#153](https://github.com/ipfs/go-graphsync/pull/153))
- docs(architecture): update architecture docs (#154) ([ipfs/go-graphsync#154](https://github.com/ipfs/go-graphsync/pull/154))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 5 | +885/-598 | 55 |
| dirkmc | 1 | +79/-50 | 2 |
| Aarsh Shah | 1 | +2/-6 | 2 |

# go-graphsync 0.7.0

This is a small release to update some dependencies. Importantly, it pulls in go-ipld-prime with
Expand All @@ -20,6 +46,23 @@ some significant breaking changes.
| Hannah Howard | 2 | +3316/-3015 | 25 |
| Steven Allen | 1 | +95/-227 | 5 |

# go-graphsync 0.6.9

This release adds additional log statements and addresses a memory performance bug on the requesting side when making lots of outgoing requests at once

### Changelog

- github.com/ipfs/go-graphsync:
- Back pressure incoming responses ([ipfs/go-graphsync#204](https://github.com/ipfs/go-graphsync/pull/204))
- Log unverified blockstore memory consumption ([ipfs/go-graphsync#201](https://github.com/ipfs/go-graphsync/pull/201))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| hannahhoward | 5 | +1535/-381 | 25 |
| Aarsh Shah | 5 | +27/-17 | 5 |

# go-graphsync 0.6.8

### Changelog
Expand Down
20 changes: 20 additions & 0 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure-missing-blocks", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesMissingTopLevelBlock(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
Expand Down Expand Up @@ -302,6 +305,23 @@ func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLeve
}
}

func allFilesMissingTopLevelBlock(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel, useRawNodes)
ds := merkledag.NewDAGService(blockservice.New(prov.BlockStore, offline.Exchange(prov.BlockStore)))
lnks, err := ds.GetLinks(ctx, c)
require.NoError(b, err)
randLink := lnks[rand.Intn(len(lnks))]
err = ds.Remove(ctx, randLink.Cid)
require.NoError(b, err)
cids = append(cids, c)
}
return cids
}
}

type tempDirMaker struct {
tdm string
tempDirSeq int32
Expand Down
50 changes: 36 additions & 14 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ type GraphSync struct {
}

type graphsyncConfigOptions struct {
totalMaxMemory uint64
maxMemoryPerPeer uint64
maxInProgressRequests uint64
registerDefaultValidator bool
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressRequests uint64
registerDefaultValidator bool
}

// Option defines the functional option type that can be used to configure
Expand All @@ -84,15 +86,31 @@ func RejectAllRequestsByDefault() Option {
// may consume queueing up messages for a response in total
func MaxMemoryResponder(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemory = totalMaxMemory
gs.totalMaxMemoryResponder = totalMaxMemory
}
}

// MaxMemoryPerPeerResponder defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeer = maxMemoryPerPeer
gs.maxMemoryPerPeerResponder = maxMemoryPerPeer
}
}

// MaxMemoryRequestor defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
func MaxMemoryRequestor(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemoryRequestor = totalMaxMemory
}
}

// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer
}
}

Expand All @@ -111,10 +129,12 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx, cancel := context.WithCancel(parent)

gsConfig := &graphsyncConfigOptions{
totalMaxMemory: defaultTotalMaxMemory,
maxMemoryPerPeer: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
}
for _, option := range options {
option(gsConfig)
Expand All @@ -135,12 +155,14 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
if gsConfig.registerDefaultValidator {
incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
}
allocator := allocator.NewAllocator(gsConfig.totalMaxMemory, gsConfig.maxMemoryPerPeer)
responseAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryResponder, gsConfig.maxMemoryPerPeerResponder)
createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
return messagequeue.New(ctx, p, network, allocator)
return messagequeue.New(ctx, p, network, responseAllocator)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, linkSystem)
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
Expand Down Expand Up @@ -169,7 +191,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
persistenceOptions: persistenceOptions,
ctx: ctx,
cancel: cancel,
allocator: allocator,
allocator: responseAllocator,
}

asyncLoader.Startup()
Expand Down
5 changes: 3 additions & 2 deletions message/pb/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 42 additions & 24 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"io/ioutil"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
Expand All @@ -16,6 +18,8 @@ import (
"github.com/ipfs/go-graphsync/requestmanager/types"
)

var log = logging.Logger("gs-asyncloader")

type loaderMessage interface {
handle(al *AsyncLoader)
}
Expand All @@ -25,6 +29,12 @@ type alternateQueue struct {
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

// Allocator indicates a mechanism for tracking memory used by a given peer
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleaseBlockMemory(p peer.ID, amount uint64) error
}

// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
type AsyncLoader struct {
Expand All @@ -39,12 +49,13 @@ type AsyncLoader struct {
alternateQueues map[string]alternateQueue
responseCache *responsecache.ResponseCache
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
allocator Allocator
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem)
func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem, allocator)
ctx, cancel := context.WithCancel(ctx)
return &AsyncLoader{
ctx: ctx,
Expand All @@ -57,6 +68,7 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader {
alternateQueues: make(map[string]alternateQueue),
responseCache: responseCache,
loadAttemptQueue: loadAttemptQueue,
allocator: allocator,
}
}

Expand Down Expand Up @@ -101,8 +113,16 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp

// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
totalMemoryAllocated := uint64(0)
for _, blk := range blks {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated):
case <-al.ctx.Done():
}
select {
case <-al.ctx.Done():
case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
Expand All @@ -111,10 +131,10 @@ func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadat

// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
resultChan := make(chan types.AsyncLoadResult, 1)
response := make(chan error, 1)
lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
lr := loadattemptqueue.NewLoadRequest(p, requestID, link, resultChan)
_ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response)
return resultChan
}
Expand Down Expand Up @@ -255,7 +275,7 @@ func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
if existing {
return errors.New("already registerd a persistence option with this name")
}
responseCache, loadAttemptQueue := setupAttemptQueue(rpom.linkSystem)
responseCache, loadAttemptQueue := setupAttemptQueue(rpom.linkSystem, al.allocator)
al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue}
return nil
}
Expand Down Expand Up @@ -344,32 +364,30 @@ func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
al.responseCache.FinishRequest(crm.requestID)
}

func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link)
if data == nil && err == nil {
// fall back to local store
stream, loadErr := lsys.StorageReadOpener(ipld.LinkContext{}, link)
if stream != nil && loadErr == nil {
localData, loadErr := ioutil.ReadAll(stream)
if loadErr == nil && localData != nil {
return types.AsyncLoadResult{
Data: localData,
Err: nil,
Local: true,
}
}
if err != nil {
return types.AsyncLoadResult{Err: err, Local: false}
}
if data != nil {
err = allocator.ReleaseBlockMemory(p, uint64(len(data)))
if err != nil {
log.Warningf("releasing block memory: %s", err.Error())
}
return types.AsyncLoadResult{Data: data, Local: false}
}
return types.AsyncLoadResult{
Data: data,
Err: err,
Local: false,
// fall back to local store
if stream, err := lsys.StorageReadOpener(ipld.LinkContext{}, link); stream != nil && err == nil {
if localData, err := ioutil.ReadAll(stream); err == nil && localData != nil {
return types.AsyncLoadResult{Data: localData, Local: true}
}
}
return types.AsyncLoadResult{Local: false}
})

return responseCache, loadAttemptQueue
Expand Down
Loading