Skip to content

Commit

Permalink
fix(share/p2p)!: share.ErrNamespaceNotFound and integration into shre…
Browse files Browse the repository at this point in the history
…x-nd (#2230)

This PR replaces #2156 , as gh was not showing the diff properly. It
closes #2145

---------

Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
  • Loading branch information
distractedm1nd and Wondertan committed May 19, 2023
1 parent bd70494 commit 62a0b97
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 58 deletions.
2 changes: 1 addition & 1 deletion share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestService_GetSharesByNamespaceNotFound(t *testing.T) {
root.RowsRoots = nil

_, err := getter.GetSharesByNamespace(context.Background(), root, []byte{1, 1, 1, 1, 1, 1, 1, 1})
assert.ErrorIs(t, err, share.ErrNotFound)
assert.ErrorIs(t, err, share.ErrNamespaceNotFound)
}

func BenchmarkService_GetSharesByNamespace(b *testing.B) {
Expand Down
9 changes: 7 additions & 2 deletions share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ import (
"github.com/celestiaorg/rsmt2d"
)

// ErrNotFound is used to indicated that requested data could not be found.
var ErrNotFound = errors.New("data not found")
var (
// ErrNotFound is used to indicate that requested data could not be found.
ErrNotFound = errors.New("share: data not found")
// ErrNamespaceNotFound is returned by GetSharesByNamespace when data for requested root does
// not include any shares from the given namespace
ErrNamespaceNotFound = errors.New("share: namespace not found in data")
)

// Getter interface provides a set of accessors for shares by the Root.
// Automatically verifies integrity of shares(exceptions possible depending on the implementation).
Expand Down
3 changes: 3 additions & 0 deletions share/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func cascadeGetters[V any](
if getErr == nil {
return val, nil
}
if errors.Is(share.ErrNamespaceNotFound, getErr) {
return zero, getErr
}

if !errors.Is(getErr, errOperationNotSupported) {
err = errors.Join(err, getErr)
Expand Down
12 changes: 7 additions & 5 deletions share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestStoreGetter(t *testing.T) {
// nid not found
nID = make([]byte, 8)
_, err = sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNotFound)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)

// root not found
root := share.Root{}
Expand Down Expand Up @@ -206,20 +206,22 @@ func TestIPLDGetter(t *testing.T) {
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// first check that shares are returned correctly if they exist
shares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
require.NoError(t, shares.Verify(&dah, nID))
assert.Len(t, shares.Flatten(), 2)

// nid not found
nID = make([]byte, 8)
_, err = sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNotFound)
emptyShares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
require.Nil(t, emptyShares)

// root not found
// nid doesnt exist in root
root := share.Root{}
_, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.ErrorIs(t, err, share.ErrNotFound)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
})
}

Expand Down
11 changes: 9 additions & 2 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ func (sg *ShrexGetter) GetSharesByNamespace(
attempt int
err error
)

// verify that the namespace could exist inside the roots before starting network requests
roots := filterRootsByNamespace(root, id)
if len(roots) == 0 {
return nil, share.ErrNamespaceNotFound
}

for {
if ctx.Err() != nil {
sg.metrics.recordNDAttempt(ctx, attempt, false)
Expand All @@ -210,10 +217,10 @@ func (sg *ShrexGetter) GetSharesByNamespace(
nd, getErr := sg.ndClient.RequestND(reqCtx, root, id, peer)
cancel()
switch {
case getErr == nil:
case getErr == nil, errors.Is(getErr, share.ErrNamespaceNotFound):
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(ctx, attempt, true)
return nd, nil
return nd, getErr
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
setStatus(peers.ResultCooldownPeer)
Expand Down
28 changes: 25 additions & 3 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,25 @@ func TestShrexGetter(t *testing.T) {
require.ErrorIs(t, err, share.ErrNotFound)
})

t.Run("ND_namespace_not_found", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, nID := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

// corrupt NID
nID[4]++

_, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
})

t.Run("EDS_Available", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)
Expand Down Expand Up @@ -150,7 +169,8 @@ func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, da.DataAvailabil
return eds, dah, randNID
}

func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscriber[*header.ExtendedHeader],
func testManager(
ctx context.Context, host host.Host, headerSub libhead.Subscriber[*header.ExtendedHeader],
) (*peers.Manager, error) {
shrexSub, err := shrexsub.NewPubSub(ctx, host, "test")
if err != nil {
Expand All @@ -177,7 +197,8 @@ func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscrib
return manager, err
}

func newNDClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
func newNDClientServer(
ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
) (*shrexnd.Client, *shrexnd.Server) {
params := shrexnd.DefaultParameters()

Expand All @@ -196,7 +217,8 @@ func newNDClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, s
return client, server
}

func newEDSClientServer(ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
func newEDSClientServer(
ctx context.Context, t *testing.T, edsStore *eds.Store, srvHost, clHost host.Host,
) (*shrexeds.Client, *shrexeds.Server) {
params := shrexeds.DefaultParameters()

Expand Down
1 change: 1 addition & 0 deletions share/getters/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,6 @@ func (sg *StoreGetter) GetSharesByNamespace(
if err != nil {
return nil, fmt.Errorf("getter/store: failed to retrieve shares by namespace: %w", err)
}

return shares, nil
}
6 changes: 3 additions & 3 deletions share/getters/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func collectSharesByNamespace(

rootCIDs := filterRootsByNamespace(root, nID)
if len(rootCIDs) == 0 {
return nil, share.ErrNotFound
return nil, share.ErrNamespaceNotFound
}

errGroup, ctx := errgroup.WithContext(ctx)
Expand All @@ -84,9 +84,9 @@ func collectSharesByNamespace(
return nil, err
}

// return ErrNotFound if no shares are found for namespaceID
// return ErrNamespaceNotFound if no shares are found for the namespace.ID
if len(rootCIDs) == 1 && len(shares[0].Shares) == 0 {
return nil, share.ErrNotFound
return nil, share.ErrNamespaceNotFound
}

return shares, nil
Expand Down
4 changes: 3 additions & 1 deletion share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *Client) RequestND(
return nil, context.DeadlineExceeded
}
}
if err != p2p.ErrNotFound {
if err != p2p.ErrNotFound && err != share.ErrNamespaceNotFound {
log.Warnw("client-nd: peer returned err", "err", err)
}
return nil, err
Expand Down Expand Up @@ -195,6 +195,8 @@ func (c *Client) statusToErr(ctx context.Context, code pb.StatusCode) error {
case pb.StatusCode_NOT_FOUND:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return p2p.ErrNotFound
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return share.ErrNamespaceNotFound
case pb.StatusCode_INVALID:
log.Debug("client-nd: invalid request")
fallthrough
Expand Down
6 changes: 3 additions & 3 deletions share/p2p/shrexnd/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestExchange_RequestND_NotFound(t *testing.T) {
require.ErrorIs(t, err, p2p.ErrNotFound)
})

t.Run("Getter_err_not_found", func(t *testing.T) {
t.Run("ErrNamespaceNotFound", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

Expand All @@ -49,7 +49,7 @@ func TestExchange_RequestND_NotFound(t *testing.T) {

randNID := dah.RowsRoots[(len(dah.RowsRoots)-1)/2][:8]
_, err := client.RequestND(ctx, &dah, randNID, server.host.ID())
require.ErrorIs(t, err, p2p.ErrNotFound)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
})
}

Expand Down Expand Up @@ -118,7 +118,7 @@ func (m notFoundGetter) GetEDS(
func (m notFoundGetter) GetSharesByNamespace(
_ context.Context, _ *share.Root, _ namespace.ID,
) (share.NamespacedShares, error) {
return nil, share.ErrNotFound
return nil, share.ErrNamespaceNotFound
}

func newStore(t *testing.T) *eds.Store {
Expand Down
2 changes: 1 addition & 1 deletion share/p2p/shrexnd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/celestiaorg/celestia-node/share/p2p"
)

const protocolString = "/shrex/nd/0.0.1"
const protocolString = "/shrex/nd/v0.0.2"

var log = logging.Logger("shrex/nd")

Expand Down
70 changes: 37 additions & 33 deletions share/p2p/shrexnd/pb/share.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions share/p2p/shrexnd/pb/share.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum StatusCode {
OK = 1;
NOT_FOUND = 2;
INTERNAL = 3;
NAMESPACE_NOT_FOUND = 4;
};

message Row {
Expand Down
20 changes: 16 additions & 4 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,15 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre
}

shares, err := srv.getter.GetSharesByNamespace(ctx, dah, req.NamespaceId)
if errors.Is(err, share.ErrNotFound) {
switch {
case errors.Is(err, share.ErrNotFound):
logger.Warn("server: nd not found")
srv.respondNotFoundError(ctx, logger, stream)
return
}
if err != nil {
case errors.Is(err, share.ErrNamespaceNotFound):
srv.respondNamespaceNotFoundError(ctx, logger, stream)
return
case err != nil:
logger.Errorw("server: retrieving shares", "err", err)
srv.respondInternalError(ctx, logger, stream)
return
Expand All @@ -157,7 +160,7 @@ func validateRequest(req pb.GetSharesByNamespaceRequest) error {
return nil
}

// respondNotFoundError sends internal error response to client
// respondNotFoundError sends a not found response to client
func (srv *Server) respondNotFoundError(ctx context.Context,
logger *zap.SugaredLogger, stream network.Stream) {
resp := &pb.GetSharesByNamespaceResponse{
Expand All @@ -166,6 +169,15 @@ func (srv *Server) respondNotFoundError(ctx context.Context,
srv.respond(ctx, logger, stream, resp)
}

// respondNamespaceNotFoundError sends a namespace not found response to client
func (srv *Server) respondNamespaceNotFoundError(ctx context.Context,
logger *zap.SugaredLogger, stream network.Stream) {
resp := &pb.GetSharesByNamespaceResponse{
Status: pb.StatusCode_NAMESPACE_NOT_FOUND,
}
srv.respond(ctx, logger, stream, resp)
}

// respondInternalError sends internal error response to client
func (srv *Server) respondInternalError(ctx context.Context,
logger *zap.SugaredLogger, stream network.Stream) {
Expand Down

0 comments on commit 62a0b97

Please sign in to comment.