Skip to content

Commit

Permalink
Update malfeasance.Info to fetch proof from db instead of passing it …
Browse files Browse the repository at this point in the history
…as argument
  • Loading branch information
fasmat committed Dec 4, 2024
1 parent a76a092 commit 8c6bb28
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 46 deletions.
1 change: 0 additions & 1 deletion api/grpcserver/activation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p
proof, err := s.atxProvider.MalfeasanceProof(atx.SmesherID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
ctxzap.Error(ctx, "failed to get malfeasance proof",
zap.Stringer("smesher", atx.SmesherID),
zap.Stringer("smesher", atx.SmesherID),
zap.Stringer("id", atxId),
zap.Error(err),
Expand Down
9 changes: 6 additions & 3 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ func (s *MeshService) MalfeasanceQuery(
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}
// TODO(mafa): query malfeasance handler for data instead of extracting from proof bytes
return &pb.MalfeasanceResponse{
Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof),
}, nil
Expand All @@ -627,7 +628,7 @@ func (s *MeshService) MalfeasanceStream(
if sub == nil {
return status.Errorf(codes.FailedPrecondition, "event reporting is not enabled")
}
eventch, fullch := consumeEvents[events.EventMalfeasance](stream.Context(), sub)
eventCh, fullCh := consumeEvents[events.EventMalfeasance](stream.Context(), sub)
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")
}
Expand All @@ -638,6 +639,7 @@ func (s *MeshService) MalfeasanceStream(
case <-stream.Context().Done():
return nil
default:
// TODO(mafa): query malfeasance handler for data instead of extracting from proof bytes
res := &pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof),
}
Expand All @@ -651,9 +653,10 @@ func (s *MeshService) MalfeasanceStream(
select {
case <-stream.Context().Done():
return nil
case <-fullch:
case <-fullCh:
return status.Errorf(codes.Canceled, "buffer is full")
case ev := <-eventch:
case ev := <-eventCh:
// TODO(mafa): query malfeasance handler for data instead of extracting from proof bytes
if err := stream.Send(&pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(ev.Smesher, ev.Proof, req.IncludeProof),
}); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion api/grpcserver/v2alpha1/interface.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package v2alpha1

import (
"context"

"github.com/spacemeshos/go-spacemesh/common/types"
)

//go:generate mockgen -typed -package=v2alpha1 -destination=./mocks.go -source=./interface.go

type malfeasanceInfo interface {
Info(data []byte) (map[string]string, error)
Info(ctx context.Context, nodeID types.NodeID) (map[string]string, error)
}
39 changes: 18 additions & 21 deletions api/grpcserver/v2alpha1/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (s *MalfeasanceService) List(
}

proofs := make([]*spacemeshv2alpha1.MalfeasanceProof, 0, request.Limit)
if err := identities.IterateOps(s.db, ops, func(id types.NodeID, proof []byte, received time.Time) bool {
rst := toProof(ctx, s.info, id, proof)
if err := identities.IterateOps(s.db, ops, func(id types.NodeID, _ []byte, _ time.Time) bool {
rst := toProof(ctx, s.info, id)
if rst == nil {
return true
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *MalfeasanceStreamService) Stream(
select {
// process events first
case rst := <-eventsOut:
proof := toProof(stream.Context(), s.info, rst.Smesher, rst.Proof)
proof := toProof(stream.Context(), s.info, rst.Smesher)
if proof == nil {
continue
}
Expand All @@ -163,7 +163,7 @@ func (s *MalfeasanceStreamService) Stream(
default:
select {
case rst := <-eventsOut:
proof := toProof(stream.Context(), s.info, rst.Smesher, rst.Proof)
proof := toProof(stream.Context(), s.info, rst.Smesher)
if proof == nil {
continue
}
Expand Down Expand Up @@ -209,22 +209,20 @@ func (s *MalfeasanceStreamService) fetchFromDB(

go func() {
defer close(dbChan)
if err := identities.IterateOps(s.db, ops,
func(id types.NodeID, proof []byte, received time.Time) bool {
rst := toProof(ctx, s.info, id, proof)
if rst == nil {
return true
}
if err := identities.IterateOps(s.db, ops, func(id types.NodeID, _ []byte, _ time.Time) bool {
rst := toProof(ctx, s.info, id)
if rst == nil {
return true
}

select {
case dbChan <- rst:
return true
case <-ctx.Done():
// exit if the context is canceled
return false
}
},
); err != nil {
select {
case dbChan <- rst:
return true
case <-ctx.Done():
// exit if the context is canceled
return false
}
}); err != nil {
errChan <- status.Error(codes.Internal, err.Error())
}
}()
Expand All @@ -235,9 +233,8 @@ func toProof(
ctx context.Context,
info malfeasanceInfo,
id types.NodeID,
proof []byte,
) *spacemeshv2alpha1.MalfeasanceProof {
properties, err := info.Info(proof)
properties, err := info.Info(ctx, id)
if err != nil {
ctxzap.Debug(ctx, "failed to get malfeasance info",
zap.String("smesher", id.String()),
Expand Down
6 changes: 3 additions & 3 deletions api/grpcserver/v2alpha1/malfeasance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMalfeasanceService_List(t *testing.T) {
"type": strconv.FormatUint(uint64(i%4+1), 10),
fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i),
}
info.EXPECT().Info(proofs[i].Proof).Return(proofs[i].Properties, nil).AnyTimes()
info.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(proofs[i].Properties, nil).AnyTimes()

require.NoError(t, identities.SetMalicious(db, proofs[i].ID, proofs[i].Proof, time.Now()))
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestMalfeasanceStreamService_Stream(t *testing.T) {
"type": strconv.FormatUint(uint64(i%4+1), 10),
fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i),
}
info.EXPECT().Info(proofs[i].Proof).Return(proofs[i].Properties, nil).AnyTimes()
info.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(proofs[i].Properties, nil).AnyTimes()

require.NoError(t, identities.SetMalicious(db, proofs[i].ID, proofs[i].Proof, time.Now()))
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestMalfeasanceStreamService_Stream(t *testing.T) {
"type": strconv.FormatUint(uint64(i%4+1), 10),
fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i),
}
info.EXPECT().Info(streamed[i].Proof).Return(properties, nil).AnyTimes()
info.EXPECT().Info(gomock.Any(), streamed[i].Smesher).Return(properties, nil).AnyTimes()
}

request := &spacemeshv2alpha1.MalfeasanceStreamRequest{
Expand Down
14 changes: 8 additions & 6 deletions api/grpcserver/v2alpha1/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,15 @@ func EmitProposal(nodeID types.NodeID, layer types.LayerID, proposal types.Propo
&pb.Event_Proposal{
Proposal: &pb.EventProposal{
Layer: layer.Uint32(),
Proposal: proposal[:],
Proposal: proposal.Bytes(),
Smesher: nodeID.Bytes(),
},
},
)
}

func EmitOwnMalfeasanceProof(nodeID types.NodeID, proof []byte) {
// TODO(mafa): query malfeasance handler for data instead of extracting from proof bytes
const help = "Node committed malicious behavior. Identity will be canceled."
emitUserEvent(
help,
Expand Down Expand Up @@ -367,6 +368,9 @@ func emitUserEvent(help string, failure bool, details pb.IsEventDetails) {
}
}

// TODO (mafa): instead of passing along the proof bytes the API should query the malfeasance handler for the metadata
// of the proof if needed.
// The malfeasance handler should then take care of decoding the proof, caching if necessary and returning the metadata.
func ToMalfeasancePB(nodeID types.NodeID, proof []byte, includeProof bool) *pb.MalfeasanceProof {
mp := &wire.MalfeasanceProof{}
if err := codec.Decode(proof, mp); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion events/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// EventMalfeasance includes the malfeasance proof.
type EventMalfeasance struct {
Smesher types.NodeID
Proof []byte
Proof []byte // TODO(mafa): remove this field and fetch metadata via malfeasance handler
}

// SubscribeMalfeasance subscribes malfeasance events.
Expand Down
9 changes: 7 additions & 2 deletions malfeasance/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ func (h *Handler) countInvalidProof(p *wire.MalfeasanceProof) {
h.handlers[MalfeasanceType(p.Proof.Type)].ReportInvalidProof(numInvalidProofs)
}

func (h *Handler) Info(data []byte) (map[string]string, error) {
func (h *Handler) Info(ctx context.Context, nodeID types.NodeID) (map[string]string, error) {
var blob sql.Blob
if err := identities.LoadMalfeasanceBlob(ctx, h.cdb, nodeID.Bytes(), &blob); err != nil {
return nil, fmt.Errorf("load malfeasance proof: %w", err)
}

var p wire.MalfeasanceProof
if err := codec.Decode(data, &p); err != nil {
if err := codec.Decode(blob.Bytes, &p); err != nil {
return nil, fmt.Errorf("decode malfeasance proof: %w", err)
}
mh, ok := h.handlers[MalfeasanceType(p.Proof.Type)]
Expand Down
20 changes: 14 additions & 6 deletions malfeasance/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,12 @@ func TestHandler_HandleSyncedMalfeasanceProof(t *testing.T) {
}

func TestHandler_Info(t *testing.T) {
t.Run("malformed data", func(t *testing.T) {
t.Run("unknown identity", func(t *testing.T) {
h := newHandler(t)

info, err := h.Info(types.RandomBytes(32))
require.ErrorContains(t, err, "decode malfeasance proof:")
info, err := h.Info(context.Background(), types.RandomNodeID())
require.ErrorContains(t, err, "load malfeasance proof:")
require.ErrorIs(t, err, sql.ErrNotFound)
require.Nil(t, info)
})

Expand All @@ -392,9 +393,11 @@ func TestHandler_Info(t *testing.T) {
Data: &wire.AtxProof{},
},
}
nodeID := types.RandomNodeID()
proofBytes := codec.MustEncode(proof)
identities.SetMalicious(h.db, nodeID, proofBytes, time.Now())

info, err := h.Info(proofBytes)
info, err := h.Info(context.Background(), nodeID)
require.ErrorContains(t, err, fmt.Sprintf("unknown malfeasance type %d", wire.MultipleATXs))
require.Nil(t, info)
})
Expand All @@ -414,9 +417,11 @@ func TestHandler_Info(t *testing.T) {
Data: &wire.AtxProof{},
},
}
nodeID := types.RandomNodeID()
proofBytes := codec.MustEncode(proof)
identities.SetMalicious(h.db, nodeID, proofBytes, time.Now())

info, err := h.Info(proofBytes)
info, err := h.Info(context.Background(), nodeID)
require.ErrorContains(t, err, "invalid proof")
require.Nil(t, info)
})
Expand All @@ -440,7 +445,10 @@ func TestHandler_Info(t *testing.T) {
Data: &wire.AtxProof{},
},
}
nodeID := types.RandomNodeID()
proofBytes := codec.MustEncode(proof)
identities.SetMalicious(h.db, nodeID, proofBytes, time.Now())

expectedProperties := map[string]string{
"domain": "0",
"type": strconv.FormatUint(uint64(wire.MultipleATXs), 10),
Expand All @@ -449,7 +457,7 @@ func TestHandler_Info(t *testing.T) {
expectedProperties[k] = v
}

info, err := h.Info(proofBytes)
info, err := h.Info(context.Background(), nodeID)
require.NoError(t, err)
require.Equal(t, expectedProperties, info)
})
Expand Down
2 changes: 1 addition & 1 deletion malfeasance2/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *Publisher) PublishATXProof(
if err := malfeasance.AddProof(p.cdb, id, nil, proof, byte(InvalidActivation), time.Now()); err != nil {
return fmt.Errorf("setting malfeasance proof: %w", err)
}
// TODO(mafa): cache proof
// TODO(mafa): cache proof, right now caching it would clash with legacy malfeasance proofs
// p.cdb.CacheMalfeasanceProof(id, proof)
p.tortoise.OnMalfeasance(id)
}
Expand Down

0 comments on commit 8c6bb28

Please sign in to comment.