Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
add fetch sentinel cid tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AmeanAsad committed Jun 19, 2023
1 parent d741115 commit 9e687a1
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 27 deletions.
8 changes: 8 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type Config struct {
SaturnNodeCoolOff time.Duration

TieredHashingOpts []tieredhashing.Option

SentinelCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -106,6 +108,8 @@ const DefaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane
// however, only upto a certain max number of cool-offs.
const DefaultSaturnNodeCoolOff = 5 * time.Minute

const DefaultSentinelCidPeriod = int64(200)

var ErrNotImplemented error = errors.New("not implemented")
var ErrNoBackend error = errors.New("no available saturn backend")
var ErrBackendFailed error = errors.New("saturn backend failed")
Expand Down Expand Up @@ -232,6 +236,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.SentinelCidPeriod == 0 {
c.config.SentinelCidPeriod = DefaultSentinelCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down
12 changes: 0 additions & 12 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package caboose

import (
"context"
"crypto/rand"
"errors"
"fmt"
"hash/crc32"
"io"
"math/big"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -51,16 +49,6 @@ var (
func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, rm tieredhashing.ResponseMetrics, e error) {
reqUrl := fmt.Sprintf(saturnReqTmpl, c)

rand, _ := rand.Int(rand.Reader, big.NewInt(sentinelCidPeriod))
if rand == big.NewInt(1) {
sc, _ := p.th.GetSentinelCid(from)
if len(sc) > 0 {
sentinelCid, _ := cid.Decode(sc)
sentinelReqUrl := fmt.Sprintf(saturnReqTmpl, sentinelCid)
go p.fetchResource(ctx, from, sentinelReqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { return nil })
}
}

rm, e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error {
block, err := io.ReadAll(io.LimitReader(r, maxBlockSize))
if err != nil {
Expand Down
14 changes: 11 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
CabooseJwtIssuer = "caboose-client"
)

var sentinelCidReqTemplate = "/ipfs/%s?format=car&car-scope=block"

// authenticateReq adds authentication to a request when a JWT_SECRET is present as an environment variable.
func authenticateReq(req *http.Request, key string) (*http.Request, error) {

Expand Down Expand Up @@ -243,7 +245,7 @@ func (p *pool) fetchSentinelCid(node string) error {
return err
}
trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
reqUrl := fmt.Sprintf(saturnReqTmpl, sc)
reqUrl := fmt.Sprintf(sentinelCidReqTemplate, sc)
err = p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator)
cancel()
return err
Expand All @@ -265,10 +267,15 @@ func (p *pool) checkPool() {
continue
}
trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)

err := p.fetchResourceAndUpdate(trialTimeout, testNodes[0], msg.path, 0, p.mirrorValidator)

rand, _ := cryptoRand.Int(cryptoRand.Reader, big.NewInt(sentinelCidPeriod))
if rand == big.NewInt(1) {
rand := big.NewInt(1)
if p.config.SentinelCidPeriod > 0 {
rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.SentinelCidPeriod))
}

if rand.Cmp(big.NewInt(0)) == 0 {
err := p.fetchSentinelCid(testNodes[0])
if err != nil {
goLogger.Warnw("failed to fetch sentinel cid ", "err", err)
Expand Down Expand Up @@ -347,6 +354,7 @@ func cidToKey(c cid.Cid) string {
}

func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk blocks.Block, err error) {

fetchCalledTotalMetric.WithLabelValues(resourceTypeBlock).Add(1)
if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWith") {
return nil, ctx.Err()
Expand Down
60 changes: 48 additions & 12 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestPoolMiroring(t *testing.T) {
ph := BuildPoolHarness(t, 2, opts)

p := ph.p
p.config.SentinelCidPeriod = 0
nodes := ph.p.config.OrchestratorOverride
p.doRefresh()
p.config.OrchestratorOverride = nil
Expand Down Expand Up @@ -199,18 +200,53 @@ func TestAuthenticateReq(t *testing.T) {
assert.True(t, time.Now().Unix() < int64(expiresAt), "Token should not have expired")
}

// TODO: fix this test
// func TestFetchSentinelCid(t *testing.T) {

// ph := BuildPoolHarness(t, 10, nil)
// ph.p.th.AddOrchestratorNodes(ph.p.config.OrchestratorOverride)
// for _, node := range(ph.p.config.OrchestratorOverride) {
// err := ph.p.fetchSentinelCid(node.IP)
// if err != nil {
// t.Fatal(err)
// }
// }
// }
func TestFetchSentinelCid(t *testing.T) {
if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 {
t.Skip("skipping for 32bit architectures because too slow")
}
opts := []tieredhashing.Option{
tieredhashing.WithCorrectnessWindowSize(2),
tieredhashing.WithLatencyWindowSize(2),
tieredhashing.WithMaxMainTierSize(1),
}
ph := BuildPoolHarness(t, 2, opts)

p := ph.p
p.config.SentinelCidPeriod = 1
nodes := ph.p.config.OrchestratorOverride
p.doRefresh()
p.config.OrchestratorOverride = nil
p.Start()

// promote one node to main pool. other will remain in uknown pool.
eURL := nodes[0].IP
p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30})
p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30})
p.th.UpdateMainTierWithTopN()

ls := cidlink.DefaultLinkSystem()
lsm := memstore.Store{}
ls.SetReadStorage(&lsm)
ls.SetWriteStorage(&lsm)
finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(testBlock))
finalC := finalCL.(cidlink.Link).Cid

_, err := p.fetchBlockWith(context.Background(), finalC, "")
if err != nil {
t.Fatal(err)
}

time.Sleep(100 * time.Millisecond)
p.Close()

e := ph.eps[1]
e.lk.Lock()
defer e.lk.Unlock()

if e.cnt != 2 {
t.Fatalf("expected 2 primary fetch, got %d", e.cnt)
}
}

type PoolHarness struct {
p *pool
Expand Down

0 comments on commit 9e687a1

Please sign in to comment.