Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add check using only a CID #47

Merged
merged 14 commits into from
Aug 30, 2024
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,38 @@ Note that the `multiaddr` can be:

### Check results

The server performs several checks given a CID. The results of the check are expressed by the `output` type:
The server performs several checks depending on whether you also pass a **multiaddr** or just a **cid**.

#### Results when only a `cid` is passed

The results of the check are expressed by the `cidCheckOutput` type:

```go
type cidCheckOutput *[]providerOutput

type providerOutput struct {
ID string
ConnectionError string
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}
```

The `providerOutput` type contains the following fields:

- `ID`: The peer ID of the provider.
- `ConnectionError`: An error message if the connection to the provider failed.
- `Addrs`: The multiaddrs of the provider from the DHT.
- `ConnectionMaddrs`: The multiaddrs that were used to connect to the provider.
- `DataAvailableOverBitswap`: The result of the Bitswap check.

#### Results when a `multiaddr` and a `cid` are passed

The results of the check are expressed by the `peerCheckOutput` type:

```go
type output struct {
type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
Expand Down
189 changes: 145 additions & 44 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import (
"context"
"errors"
"fmt"
"log"
"net/url"
"sync"
"time"

Expand All @@ -20,10 +18,12 @@
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
)

type kademlia interface {
Expand All @@ -36,8 +36,13 @@
dht kademlia
dhtMessenger *dhtpb.ProtocolMessenger
createTestHost func() (host.Host, error)
promRegistry *prometheus.Registry
}

// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
var MaxProvidersCount = 10

func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
rm, err := NewResourceManager()
if err != nil {
Expand All @@ -49,13 +54,17 @@
return nil, err
}

// Create a custom registry for all prometheus metrics
promRegistry := prometheus.NewRegistry()

Check warning on line 58 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L58

Added line #L58 was not covered by tests

h, err := libp2p.New(
libp2p.DefaultMuxers,
libp2p.Muxer(mplex.ID, mplex.DefaultTransport),
libp2p.ConnectionManager(c),
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.ResourceManager(rm),
libp2p.EnableHolePunching(),
libp2p.PrometheusRegisterer(promRegistry),

Check warning on line 67 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L67

Added line #L67 was not covered by tests
libp2p.UserAgent(userAgent),
)
if err != nil {
Expand Down Expand Up @@ -88,15 +97,20 @@
return nil, err
}

return &daemon{h: h, dht: d, dhtMessenger: pm, createTestHost: func() (host.Host, error) {
return libp2p.New(
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.EnableHolePunching(),
libp2p.UserAgent(userAgent),
)
}}, nil
return &daemon{
h: h,
dht: d,
dhtMessenger: pm,
promRegistry: promRegistry,
createTestHost: func() (host.Host, error) {
return libp2p.New(
libp2p.ConnectionGater(&privateAddrFilterConnectionGater{}),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.EnableHolePunching(),
libp2p.UserAgent(userAgent),
)
}}, nil

Check warning on line 113 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L100-L113

Added lines #L100 - L113 were not covered by tests
}

func (d *daemon) mustStart() {
Expand All @@ -109,18 +123,101 @@

}

func (d *daemon) runCheck(query url.Values) (*output, error) {
maStr := query.Get("multiaddr")
cidStr := query.Get("cid")
type cidCheckOutput *[]providerOutput

if maStr == "" {
return nil, errors.New("missing 'multiaddr' argument")
type providerOutput struct {
ID string
ConnectionError string
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

// runCidCheck looks up the DHT for providers of a given CID and then checks their connectivity and Bitswap availability
func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput, error) {
cid, err := cid.Decode(cidStr)
if err != nil {
return nil, err

Check warning on line 140 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L140

Added line #L140 was not covered by tests
}

if cidStr == "" {
return nil, errors.New("missing 'cid' argument")
out := make([]providerOutput, 0, MaxProvidersCount)

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount)

var wg sync.WaitGroup
var mu sync.Mutex

for provider := range provsCh {
wg.Add(1)
go func(provider peer.AddrInfo) {
defer wg.Done()

addrs := []string{}
if len(provider.Addrs) > 0 {
for _, addr := range provider.Addrs {
if manet.IsPublicAddr(addr) { // only return public addrs
addrs = append(addrs, addr.String())

Check warning on line 161 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L161

Added line #L161 was not covered by tests
}
}
}

provOutput := providerOutput{
ID: provider.ID.String(),
Addrs: addrs,
DataAvailableOverBitswap: BitswapCheckOutput{},
}

testHost, err := d.createTestHost()
if err != nil {
log.Printf("Error creating test host: %v\n", err)
return

Check warning on line 175 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L174-L175

Added lines #L174 - L175 were not covered by tests
}
defer testHost.Close()

// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
defer dialCancel()

testHost.Connect(dialCtx, provider)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")

if connErr != nil {
provOutput.ConnectionError = connErr.Error()

Check warning on line 188 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L188

Added line #L188 was not covered by tests
} else {
// since we pass a libp2p host that's already connected to the peer the actual connection maddr we pass in doesn't matter
p2pAddr, _ := multiaddr.NewMultiaddr("/p2p/" + provider.ID.String())
provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cid, p2pAddr)

for _, c := range testHost.Network().ConnsToPeer(provider.ID) {
provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String())
}
}

mu.Lock()
out = append(out, provOutput)
mu.Unlock()
}(provider)
}

// Wait for all goroutines to finish
wg.Wait()

return &out, nil
}

type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerCheckOutput, error) {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
return nil, err
Expand All @@ -139,12 +236,11 @@
return nil, err
}

ctx := context.Background()
out := &output{}
out := &peerCheckOutput{}

connectionFailed := false

out.CidInDHT = providerRecordInDHT(ctx, d.dht, c, ai.ID)
out.CidInDHT = providerRecordForPeerInDHT(ctx, d.dht, c, ai.ID)

addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)
out.PeerFoundInDHT = addrMap
Expand Down Expand Up @@ -174,15 +270,28 @@

if !connectionFailed {
// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*120)

// we call NewStream instead of Connect to force NAT hole punching
// See https://github.com/libp2p/go-libp2p/issues/2714
testHost.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.RecentlyConnectedAddrTTL)
testHost.Connect(dialCtx, *ai)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, ai.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")
dialCancel()
if connErr != nil {
out.ConnectionError = fmt.Sprintf("error dialing to peer: %s", connErr.Error())
log.Printf("Error connecting to peer %s: %v", ai.ID, connErr)
ids, ok := testHost.(interface{ IDService() identify.IDService })
if ok {
log.Printf("Own observed addrs: %v", ids.IDService().OwnObservedAddrs())

Check warning on line 283 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L280-L283

Added lines #L280 - L283 were not covered by tests
}

// Log all open connections
for _, conn := range testHost.Network().Conns() {
log.Printf("Open connection: Peer ID: %s, Remote Addr: %s, Local Addr: %s",
conn.RemotePeer(),
conn.RemoteMultiaddr(),
conn.LocalMultiaddr(),
)

Check warning on line 292 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L287-L292

Added lines #L287 - L292 were not covered by tests
}
out.ConnectionError = connErr.Error()

Check warning on line 294 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L294

Added line #L294 was not covered by tests
connectionFailed = true
}
}
Expand All @@ -203,8 +312,15 @@
return out, nil
}

type BitswapCheckOutput struct {
Duration time.Duration
Found bool
Responded bool
Error string
}

func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiaddr.Multiaddr) BitswapCheckOutput {
log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the temporary peer: %s", c, ma, host.ID())
log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the peer: %s", c, ma, host.ID())
out := BitswapCheckOutput{}
start := time.Now()

Expand All @@ -224,21 +340,6 @@
return out
}

type BitswapCheckOutput struct {
Duration time.Duration
Found bool
Responded bool
Error string
}

type output struct {
ConnectionError string
PeerFoundInDHT map[string]int
CidInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMessenger, p peer.ID) (map[string]int, error) {
closestPeers, err := d.GetClosestPeers(ctx, string(p))
if err != nil {
Expand Down Expand Up @@ -282,7 +383,7 @@
return addrMap, nil
}

func providerRecordInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
func providerRecordForPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.FindProvidersAsync(queryCtx, c, 0)
Expand Down
33 changes: 33 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multihash"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -61,6 +63,7 @@ func TestBasicIntegration(t *testing.T) {
require.NoError(t, err)

d := &daemon{
promRegistry: prometheus.NewRegistry(),
h: queryHost,
dht: queryDHT,
dhtMessenger: pm,
Expand Down Expand Up @@ -157,4 +160,34 @@ func TestBasicIntegration(t *testing.T) {
obj.Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsFalse()
obj.Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue()
})

t.Run("Data found on reachable peer with just cid", func(t *testing.T) {
testData := []byte(t.Name())
mh, err := multihash.Sum(testData, multihash.SHA2_256, -1)
require.NoError(t, err)
testCid := cid.NewCidV1(cid.Raw, mh)
testBlock, err := blocks.NewBlockWithCid(testData, testCid)
require.NoError(t, err)
err = bstore.Put(ctx, testBlock)
require.NoError(t, err)
err = dhtClient.Provide(ctx, testCid, true)
require.NoError(t, err)

res := test.QueryCid(t, "http://localhost:1234", testCid.String())

res.Length().IsEqual(1)
res.Value(0).Object().Value("ID").String().IsEqual(h.ID().String())
res.Value(0).Object().Value("ConnectionError").String().IsEmpty()
testHostAddrs := h.Addrs()
for _, addr := range testHostAddrs {
if manet.IsPublicAddr(addr) {
res.Value(0).Object().Value("Addrs").Array().ContainsAny(addr.String())
}
}

res.Value(0).Object().Value("ConnectionMaddrs").Array()
res.Value(0).Object().Value("DataAvailableOverBitswap").Object().Value("Error").String().IsEmpty()
res.Value(0).Object().Value("DataAvailableOverBitswap").Object().Value("Found").Boolean().IsTrue()
res.Value(0).Object().Value("DataAvailableOverBitswap").Object().Value("Responded").Boolean().IsTrue()
})
}
Loading
Loading