diff --git a/caboose.go b/caboose.go index 045ed1d..73bb6cc 100644 --- a/caboose.go +++ b/caboose.go @@ -80,6 +80,8 @@ type Config struct { SaturnNodeCoolOff time.Duration TieredHashingOpts []tieredhashing.Option + + SentinelCidPeriod int64 } const DefaultLoggingInterval = 5 * time.Second @@ -106,6 +108,8 @@ const DefaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane // however, only upto a certain max number of cool-offs. const DefaultSaturnNodeCoolOff = 5 * time.Minute +const DefaultSentinelCidPeriod = int64(200) + var ErrNotImplemented error = errors.New("not implemented") var ErrNoBackend error = errors.New("no available saturn backend") var ErrBackendFailed error = errors.New("saturn backend failed") @@ -232,6 +236,10 @@ func NewCaboose(config *Config) (*Caboose, error) { } } + if c.config.SentinelCidPeriod == 0 { + c.config.SentinelCidPeriod = DefaultSentinelCidPeriod + } + if c.config.PoolRefresh == 0 { c.config.PoolRefresh = DefaultPoolRefreshInterval } diff --git a/fetcher.go b/fetcher.go index 79c87ee..7c6c54d 100644 --- a/fetcher.go +++ b/fetcher.go @@ -2,12 +2,10 @@ package caboose import ( "context" - "crypto/rand" "errors" "fmt" "hash/crc32" "io" - "math/big" "net/http" "os" "strconv" @@ -51,16 +49,6 @@ var ( func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, rm tieredhashing.ResponseMetrics, e error) { reqUrl := fmt.Sprintf(saturnReqTmpl, c) - rand, _ := rand.Int(rand.Reader, big.NewInt(sentinelCidPeriod)) - if rand == big.NewInt(1) { - sc, _ := p.th.GetSentinelCid(from) - if len(sc) > 0 { - sentinelCid, _ := cid.Decode(sc) - sentinelReqUrl := fmt.Sprintf(saturnReqTmpl, sentinelCid) - go p.fetchResource(ctx, from, sentinelReqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { return nil }) - } - } - rm, e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { block, err := io.ReadAll(io.LimitReader(r, maxBlockSize)) if err != nil { diff --git a/pool.go b/pool.go index 6cfe15a..f778c91 100644 --- a/pool.go +++ b/pool.go @@ -36,6 +36,8 @@ const ( CabooseJwtIssuer = "caboose-client" ) +var sentinelCidReqTemplate = "/ipfs/%s?format=car&car-scope=block" + // authenticateReq adds authentication to a request when a JWT_SECRET is present as an environment variable. func authenticateReq(req *http.Request, key string) (*http.Request, error) { @@ -243,7 +245,7 @@ func (p *pool) fetchSentinelCid(node string) error { return err } trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) - reqUrl := fmt.Sprintf(saturnReqTmpl, sc) + reqUrl := fmt.Sprintf(sentinelCidReqTemplate, sc) err = p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator) cancel() return err @@ -265,10 +267,15 @@ func (p *pool) checkPool() { continue } trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + err := p.fetchResourceAndUpdate(trialTimeout, testNodes[0], msg.path, 0, p.mirrorValidator) - rand, _ := cryptoRand.Int(cryptoRand.Reader, big.NewInt(sentinelCidPeriod)) - if rand == big.NewInt(1) { + rand := big.NewInt(1) + if p.config.SentinelCidPeriod > 0 { + rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.SentinelCidPeriod)) + } + + if rand.Cmp(big.NewInt(0)) == 0 { err := p.fetchSentinelCid(testNodes[0]) if err != nil { goLogger.Warnw("failed to fetch sentinel cid ", "err", err) @@ -347,6 +354,7 @@ func cidToKey(c cid.Cid) string { } func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk blocks.Block, err error) { + fetchCalledTotalMetric.WithLabelValues(resourceTypeBlock).Add(1) if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWith") { return nil, ctx.Err() diff --git a/pool_test.go b/pool_test.go index 248c887..de5af1d 100644 --- a/pool_test.go +++ b/pool_test.go @@ -74,6 +74,7 @@ func TestPoolMiroring(t *testing.T) { ph := BuildPoolHarness(t, 2, opts) p := ph.p + p.config.SentinelCidPeriod = 0 nodes := ph.p.config.OrchestratorOverride p.doRefresh() p.config.OrchestratorOverride = nil @@ -199,18 +200,53 @@ func TestAuthenticateReq(t *testing.T) { assert.True(t, time.Now().Unix() < int64(expiresAt), "Token should not have expired") } -// TODO: fix this test -// func TestFetchSentinelCid(t *testing.T) { - -// ph := BuildPoolHarness(t, 10, nil) -// ph.p.th.AddOrchestratorNodes(ph.p.config.OrchestratorOverride) -// for _, node := range(ph.p.config.OrchestratorOverride) { -// err := ph.p.fetchSentinelCid(node.IP) -// if err != nil { -// t.Fatal(err) -// } -// } -// } +func TestFetchSentinelCid(t *testing.T) { + if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 { + t.Skip("skipping for 32bit architectures because too slow") + } + opts := []tieredhashing.Option{ + tieredhashing.WithCorrectnessWindowSize(2), + tieredhashing.WithLatencyWindowSize(2), + tieredhashing.WithMaxMainTierSize(1), + } + ph := BuildPoolHarness(t, 2, opts) + + p := ph.p + p.config.SentinelCidPeriod = 1 + nodes := ph.p.config.OrchestratorOverride + p.doRefresh() + p.config.OrchestratorOverride = nil + p.Start() + + // promote one node to main pool. other will remain in uknown pool. + eURL := nodes[0].IP + p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30}) + p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30}) + p.th.UpdateMainTierWithTopN() + + ls := cidlink.DefaultLinkSystem() + lsm := memstore.Store{} + ls.SetReadStorage(&lsm) + ls.SetWriteStorage(&lsm) + finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(testBlock)) + finalC := finalCL.(cidlink.Link).Cid + + _, err := p.fetchBlockWith(context.Background(), finalC, "") + if err != nil { + t.Fatal(err) + } + + time.Sleep(100 * time.Millisecond) + p.Close() + + e := ph.eps[1] + e.lk.Lock() + defer e.lk.Unlock() + + if e.cnt != 2 { + t.Fatalf("expected 2 primary fetch, got %d", e.cnt) + } +} type PoolHarness struct { p *pool