Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mega update #4610

Merged
merged 25 commits into from
Jan 30, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fe8846f
gx: mass update
Stebalien Jan 24, 2018
d82b527
switch to fast base58 library
Stebalien Jan 25, 2018
8899e98
fix tests that use invalid peer IDs
Stebalien Jan 25, 2018
281d5ea
Update ipns validator
dirkmc Jan 20, 2018
b90d7bd
Remove unneccesary split in IpnsValidator
dirkmc Jan 21, 2018
f6d507b
go fmt
dirkmc Jan 21, 2018
fdb0046
Fix ipns validator key parsing
dirkmc Jan 23, 2018
49569a6
gx update badger ds (fix internal deps issue)
Stebalien Jan 25, 2018
730754d
dagmodifier: remove useless offset update
Stebalien Jan 25, 2018
d154b4a
merkledag: switch to new dag interface
Stebalien Jan 25, 2018
0d12a97
namesys: remove unecessary peerID cast
Stebalien Jan 25, 2018
699b2c4
make code-climate happier
Stebalien Jan 25, 2018
1159abd
cmds: can't call SetError after SetOutput
Stebalien Jan 26, 2018
97cb6a0
fix dht tests
Stebalien Jan 26, 2018
d73bf86
note issue for failing dht test cases
Stebalien Jan 26, 2018
e049228
fix namesys pubsub sharness tests
Stebalien Jan 26, 2018
118ecb2
remove new DHT record author check
Stebalien Jan 26, 2018
d0998a9
make base64 decoding cross-platform
Stebalien Jan 27, 2018
5acbecc
update go-lib2p-loggables
Stebalien Jan 28, 2018
faae63f
handle error from changed NewFloodSub method
Stebalien Jan 28, 2018
1054826
switch base64 decoder based on OS
Stebalien Jan 28, 2018
f1aba97
fix error in test case
Stebalien Jan 29, 2018
f9d935b
rename import of go-ipld-format from node/format to ipld
Stebalien Jan 29, 2018
87c6914
update go-ds-badger
Stebalien Jan 29, 2018
2379aa0
update comment to note that we're punting on this
Stebalien Jan 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io"

"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
Expand All @@ -21,31 +22,42 @@ var log = logging.Logger("blockservice")

var ErrNotFound = errors.New("blockservice: key not found")

type BlockGetter interface {
// GetBlock gets the requested block.
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)

// GetBlocks does a batch request for the given cids, returning blocks as
// they are found, in no particular order.
//
// It may not be able to find all requested blocks (or the context may
// be canceled). In that case, it will close the channel early. It is up
// to the consumer to detect this situation and keep track which blocks
// it has received and which it hasn't.
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
}

// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService interface {
io.Closer
BlockGetter

// Blockstore returns a reference to the underlying blockstore
Blockstore() blockstore.Blockstore

// Exchange returns a reference to the underlying exchange (usually bitswap)
Exchange() exchange.Interface

// AddBlock puts a given block to the underlying datastore
AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlock(o blocks.Block) error

// AddBlocks adds a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)

GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
DeleteBlock(o blocks.Block) error
AddBlocks(bs []blocks.Block) error

// GetBlocks does a batch request for the given cids, returning blocks as
// they are found, in no particular order.
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block

Close() error
// DeleteBlock deletes the given block from the blockservice.
DeleteBlock(o *cid.Cid) error
}

type blockService struct {
Expand Down Expand Up @@ -110,38 +122,34 @@ func NewSession(ctx context.Context, bs BlockService) *Session {

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
func (s *blockService) AddBlock(o blocks.Block) error {
c := o.Cid()
if s.checkFirst {
has, err := s.blockstore.Has(c)
if err != nil {
return nil, err
}

if has {
return c, nil
if has, err := s.blockstore.Has(c); has || err != nil {
return err
}
}

err := s.blockstore.Put(o)
if err != nil {
return nil, err
if err := s.blockstore.Put(o); err != nil {
return err
}

if err := s.exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
// TODO(stebalien): really an error?
return errors.New("blockservice is closed")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whyrusleeping Should we not just log and continue here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unclear. For now lets leave it as is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. Reported at #4623 (and the comment has been updated to point to the issue).

}

return c, nil
return nil
}

func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) AddBlocks(bs []blocks.Block) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we losing any functionality by dropping the cids from the return type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. We can get the CIDs from the blocks so there's no need to return them here. It just complicates the blockservice (and dagservice which is why I removed this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although based on the current implementation we are not losing any functionally here. If AddBlocks is ever enhanced to just add some of the blocks (and not return early on error) it will make it difficult to determine which blocks where added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't call that an enhancement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. But it is still possible for something to go wrong while adding the blocks, such as running out of space. However, since the underlying BlockStore does not indicate which blocks where added there is no way to get this information. I will make a separate issue for this so we can figure out a proper solution.

var toput []blocks.Block
if s.checkFirst {
toput = make([]blocks.Block, 0, len(bs))
for _, b := range bs {
has, err := s.blockstore.Has(b.Cid())
if err != nil {
return nil, err
return err
}
if !has {
toput = append(toput, b)
Expand All @@ -153,18 +161,16 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {

err := s.blockstore.PutMany(toput)
if err != nil {
return nil, err
return err
}

var ks []*cid.Cid
for _, o := range toput {
if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
// TODO(stebalien): Should this really *return*?
return fmt.Errorf("blockservice is closed (%s)", err)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same?

}

ks = append(ks, o.Cid())
}
return ks, nil
return nil
}

// GetBlock retrieves a particular block from the service,
Expand Down Expand Up @@ -256,8 +262,8 @@ func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f e
}

// DeleteBlock deletes a block in the blockservice from the datastore
func (s *blockService) DeleteBlock(o blocks.Block) error {
return s.blockstore.DeleteBlock(o.Cid())
func (s *blockService) DeleteBlock(c *cid.Cid) error {
return s.blockstore.DeleteBlock(c)
}

func (s *blockService) Close() error {
Expand Down
6 changes: 1 addition & 5 deletions blockservice/test/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,12 @@ func TestBlocks(t *testing.T) {
t.Error("Block key and data multihash key not equal")
}

k, err := bs.AddBlock(o)
err := bs.AddBlock(o)
if err != nil {
t.Error("failed to add block to BlockService", err)
return
}

if !k.Equals(o.Cid()) {
t.Error("returned key is not equal to block key", err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
b2, err := bs.GetBlock(ctx, o.Cid())
Expand Down
4 changes: 2 additions & 2 deletions core/commands/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ It reads from stdin, and <key> is a base58 encoded multihash.
return
}

k, err := n.Blocks.AddBlock(b)
err = n.Blocks.AddBlock(b)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

err = cmds.EmitOnce(res, &BlockStat{
Key: k.String(),
Key: b.Cid().String(),
Size: len(data),
})
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
cmdkit "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
files "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

var log = logging.Logger("cmds/files")
Expand Down Expand Up @@ -102,7 +103,7 @@ into an object of the specified format.

addAllAndPin := func(f files.File) error {
cids := cid.NewSet()
b := n.DAG.Batch()
b := node.NewBatch(req.Context(), n.DAG)

for {
file, err := f.NextFile()
Expand All @@ -122,7 +123,7 @@ into an object of the specified format.
}

for _, nd := range nds {
_, err := b.Add(nd)
err := b.Add(nd)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
peer "gx/ipfs/Qma7H6RW8wRrfZpNSXwxYGcd1E149s42FpWNpDNieSVrnU/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
"gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
pstore "gx/ipfs/QmeZVQzUrXqaszo24DAoHfGzcmCptN9JyngLkGAiEfk2x7/go-libp2p-peerstore"
ipdht "gx/ipfs/QmfChjky1VNaHUQR9F2xqR1QEyX45pqU78nhsoq5GDYoKL/go-libp2p-kad-dht"
)
Expand Down Expand Up @@ -377,7 +378,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
return nil
}

func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, cids []*cid.Cid) error {
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv node.DAGService, cids []*cid.Cid) error {
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()
Expand Down
2 changes: 1 addition & 1 deletion core/commands/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func statGetFormatOptions(req cmds.Request) (string, error) {
}
}

func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) {
func statNode(ds node.DAGService, fsn mfs.FSNode) (*Object, error) {
nd, err := fsn.GetNode()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ The JSON output contains type information.
t := unixfspb.Data_DataType(-1)

linkNode, err := link.GetNode(req.Context(), dserv)
if err == merkledag.ErrNotFound && !resolve {
if err == node.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions core/commands/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package objectcmd

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"encoding/xml"
Expand Down Expand Up @@ -422,7 +423,7 @@ And then run:
defer n.Blockstore.PinLock().Unlock()
}

objectCid, err := objectPut(n, input, inputenc, datafieldenc)
objectCid, err := objectPut(req.Context(), n, input, inputenc, datafieldenc)
if err != nil {
errType := cmdkit.ErrNormal
if err == ErrUnknownObjectEnc {
Expand Down Expand Up @@ -504,12 +505,12 @@ Available templates:
}
}

k, err := n.DAG.Add(node)
err = n.DAG.Add(req.Context(), node)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
res.SetOutput(&Object{Hash: k.String()})
res.SetOutput(&Object{Hash: node.Cid().String()})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
Expand Down Expand Up @@ -542,7 +543,7 @@ func nodeFromTemplate(template string) (*dag.ProtoNode, error) {
var ErrEmptyNode = errors.New("no data or links in this node")

// objectPut takes a format option, serializes bytes from stdin and updates the dag with that data
func objectPut(n *core.IpfsNode, input io.Reader, encoding string, dataFieldEncoding string) (*cid.Cid, error) {
func objectPut(ctx context.Context, n *core.IpfsNode, input io.Reader, encoding string, dataFieldEncoding string) (*cid.Cid, error) {

data, err := ioutil.ReadAll(io.LimitReader(input, inputLimit+10))
if err != nil {
Expand Down Expand Up @@ -602,7 +603,7 @@ func objectPut(n *core.IpfsNode, input io.Reader, encoding string, dataFieldEnco
return nil, err
}

_, err = n.DAG.Add(dagnode)
err = n.DAG.Add(ctx, dagnode)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions core/commands/object/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ the limit will not be respected by the network.

rtpb.SetData(append(rtpb.Data(), data...))

newkey, err := nd.DAG.Add(rtpb)
err = nd.DAG.Add(req.Context(), rtpb)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: newkey.String()})
res.SetOutput(&Object{Hash: rtpb.Cid().String()})
},
Type: Object{},
Marshalers: cmds.MarshalerMap{
Expand Down Expand Up @@ -177,13 +177,13 @@ Example:

rtpb.SetData(data)

newkey, err := nd.DAG.Add(rtpb)
err = nd.DAG.Add(req.Context(), rtpb)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

res.SetOutput(&Object{Hash: newkey.String()})
res.SetOutput(&Object{Hash: rtpb.Cid().String()})
},
Type: Object{},
Marshalers: cmds.MarshalerMap{
Expand Down Expand Up @@ -237,7 +237,7 @@ Removes a link by the given name from root.
return
}

nnode, err := e.Finalize(nd.DAG)
nnode, err := e.Finalize(req.Context(), nd.DAG)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -334,7 +334,7 @@ to a file containing 'bar', and returns the hash of the new object.
return
}

nnode, err := e.Finalize(nd.DAG)
nnode, err := e.Finalize(req.Context(), nd.DAG)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down
9 changes: 7 additions & 2 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"io"
"time"

bserv "github.com/ipfs/go-ipfs/blockservice"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
e "github.com/ipfs/go-ipfs/core/commands/e"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
Expand Down Expand Up @@ -555,7 +557,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, set.Visit)
err := dag.EnumerateChildren(n.Context(), dag.GetLinksWithDAG(n.DAG), k, set.Visit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -594,7 +596,10 @@ type pinVerifyOpts struct {

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} {
visited := make(map[string]PinStatus)
getLinks := n.DAG.GetOfflineLinkService().GetLinks

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new way to do offline stuff. We could also use the context trick if we get tired of this dance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is 3 lines instead of 1; ergonomically it's not an improvement...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion: #4009

So, first, I made implementing LinkService optional. If your NodeGetter (the get side of the DAGService) doesn't implement the LinkService optimizations, it doesn't have to implement the interface.

Next, I tried adding a OfflineNodeGetter method to the NodeGetter interface. However, that really tricky and messy:

  1. We realized that we'd probably want Offline* methods on every related service (which would kind of suck).
  2. Things like bitswap sessions (which I wanted to turn into valid NodeGetters would also have to implement this method). I simply couldn't find a clean way to do this.

So, in #4009, I suggested adding an "offline" flag to the context. I still think that's the best way to go because I keep running into similar cases where I want to try to do something without touching the network but @kevina reasonably objected that it was a bit magical and easy to screw up.

Hence this solution.

I could add a helper function but, really, I kind of want this to be ugly. I'm not happy with this solution at all and don't want this pattern repeated all over the codebase. For one, it makes the LinkService optimization totally useless because we don't reuse the DAG (we construct a new one with the blockstore when we do the GC).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough.

recPins := n.Pinning.RecursiveKeys()

var checkPin func(root *cid.Cid) PinStatus
Expand Down
4 changes: 2 additions & 2 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ This command outputs data in the following encodings:
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
cid, err := n.Blocks.AddBlock(blk)
err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(req.Context, n, cid)
connectToPubSubPeers(req.Context, n, blk.Cid())
}()
}

Expand Down
Loading