Skip to content

Commit

Permalink
Extend publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Dec 4, 2024
1 parent 9379bf7 commit ffde644
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 20 deletions.
2 changes: 1 addition & 1 deletion malfeasance/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *Publisher) PublishProof(ctx context.Context, smesherID types.NodeID, pr

// Only gossip the proof if we are synced (to not spam the network with proofs others probably already have).
if !p.sync.ListenToATXGossip() {
p.logger.Debug("not synced, not broadcasting malfeasance proof",
p.logger.Debug("not in sync, not broadcasting malfeasance proof",
zap.String("smesher_id", smesherID.ShortString()),
)
return nil
Expand Down
4 changes: 4 additions & 0 deletions malfeasance2/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (

//go:generate mockgen -typed -package=malfeasance2 -destination=./mocks.go -source=./interface.go

type syncer interface {
ListenToATXGossip() bool
}

type tortoise interface {
OnMalfeasance(types.NodeID)
}
Expand Down
108 changes: 90 additions & 18 deletions malfeasance2/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package malfeasance2

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -11,66 +12,137 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/malfeasance"
"github.com/spacemeshos/go-spacemesh/sql/marriage"
)

type Publisher struct {
logger *zap.Logger
cdb *datastore.CachedDB
sync syncer
tortoise tortoise
publisher pubsub.Publisher
}

func NewPublisher(
logger *zap.Logger,
cdb *datastore.CachedDB,
sync syncer,
tortoise tortoise,
publisher pubsub.Publisher,
) *Publisher {
return &Publisher{
logger: logger,
cdb: cdb,
sync: sync,
tortoise: tortoise,
publisher: publisher,
}
}

func (p *Publisher) PublishATXProof(
ctx context.Context,
nodeID types.NodeID,
proof []byte,
) error {
// Combine IDs from the present equivocation set for atx.SmesherID and IDs in atx.Marriages.
allMalicious := make(map[types.NodeID]struct{})

func (p *Publisher) PublishATXProof(ctx context.Context, nodeID types.NodeID, proof []byte) error {
marriageID, err := marriage.FindIDByNodeID(p.cdb, nodeID)
if err != nil {
switch {
case errors.Is(err, sql.ErrNotFound): // smesher is not married
malicious, err := malfeasance.IsMalicious(p.cdb, nodeID)
if err != nil {
return fmt.Errorf("check if smesher is malicious: %w", err)
}
if malicious {
p.logger.Debug("smesher is already marked as malicious", zap.String("smesher_id", nodeID.ShortString()))
return nil
}
if err := malfeasance.AddProof(p.cdb, nodeID, nil, proof, byte(InvalidActivation), time.Now()); err != nil {
return fmt.Errorf("setting malfeasance proof: %w", err)
}
// TODO(mafa): cache proof, right now caching it would clash with legacy malfeasance proofs
// arguably this shouldn't be needed at all, API queries the handler for info about a proof
// handler can decided if this needs caching or not
//
// p.cdb.CacheMalfeasanceProof(nodeID, proof)
p.tortoise.OnMalfeasance(nodeID)

return p.publish(ctx, nodeID, nil, proof) // pass nil for certificates
case err != nil:
return fmt.Errorf("getting equivocation set: %w", err)
default: // smesher is married
}

// Combine IDs from the present equivocation set for atx.SmesherID and IDs in atx.Marriages.
set, err := marriage.NodeIDsByID(p.cdb, marriageID)
if err != nil {
return fmt.Errorf("getting equivocation set: %w", err)
}
for _, id := range set {
allMalicious[id] = struct{}{}
}

for id := range allMalicious {
if err := malfeasance.AddProof(p.cdb, id, nil, proof, byte(InvalidActivation), time.Now()); err != nil {
publish := false // whether to publish the proof
malicious, err := malfeasance.IsMalicious(p.cdb, nodeID)
if err != nil {
return fmt.Errorf("check if smesher is malicious: %w", err)
}
if !malicious {
err := malfeasance.AddProof(p.cdb, nodeID, &marriageID, proof, byte(InvalidActivation), time.Now())
if err != nil {
return fmt.Errorf("setting malfeasance proof: %w", err)
}
// TODO(mafa): cache proof, right now caching it would clash with legacy malfeasance proofs
// arguably this shouldn't be needed at all, API queries the handler for info about a proof
// handler can decided if this needs caching or not
//
// p.cdb.CacheMalfeasanceProof(nodeID, proof)
p.tortoise.OnMalfeasance(nodeID)

publish = true
}

for _, id := range set {
if id == nodeID {
// already handled
continue
}
malicious, err := malfeasance.IsMalicious(p.cdb, id)
if err != nil {
return fmt.Errorf("check if smesher is malicious: %w", err)
}
if malicious {
p.logger.Debug("smesher is already marked as malicious", zap.String("smesher_id", id.ShortString()))
continue
}

publish = true
if err := malfeasance.SetMalicious(p.cdb, id, marriageID, time.Now()); err != nil {
return fmt.Errorf("setting malicious: %w", err)
}
// TODO(mafa): cache proof, right now caching it would clash with legacy malfeasance proofs
// arguably this shouldn't be needed at all, API queries the handler for info about a proof
// handler can decided if this needs caching or not
//
// p.cdb.CacheMalfeasanceProof(id, proof)
p.tortoise.OnMalfeasance(id)
}

// TODO(mafa): check if we are in sync before publishing, if not just return
if !publish {
// all smeshers were already marked as malicious - no gossip to void spamming the network
return nil
}

return p.publish(ctx, nodeID, nil, proof) // TODO(mafa): do not pass nil here for certificates
}

func (p *Publisher) publish(ctx context.Context, nodeID types.NodeID, certs []ProofCertificate, proof []byte) error {
// Only gossip the proof if we are synced (to not spam the network with proofs others probably already have).
if !p.sync.ListenToATXGossip() {
p.logger.Debug("not in sync, not broadcasting malfeasance proof",
zap.String("smesher_id", nodeID.ShortString()),
)
return nil
}

malfeasanceProof := &MalfeasanceProof{
Version: 0,
Domain: InvalidActivation,
Proof: proof,
Version: 0,
Certificates: certs,
Domain: InvalidActivation,
Proof: proof,
}
if err := p.publisher.Publish(ctx, pubsub.MalfeasanceProof2, codec.MustEncode(malfeasanceProof)); err != nil {
p.logger.Error("failed to broadcast malfeasance proof", zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ func (app *App) initServices(ctx context.Context) error {
malfeasance2Publisher := malfeasance2.NewPublisher(
app.addLogger(Malfeasance2Logger, lg).Zap(),
app.cachedDB,
syncer,
trtl,
app.host,
)
Expand Down
2 changes: 1 addition & 1 deletion sql/marriage/marriages.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NodeIDsByID(db sql.Executor, id ID) ([]types.NodeID, error) {
s.BindInt64(1, int64(id))
}, func(s *sql.Statement) bool {
var nodeID types.NodeID
s.ColumnBytes(0, nodeID[:])
s.ColumnBytes(0, nodeID.Bytes())
nodeIDs = append(nodeIDs, nodeID)
return true
})
Expand Down

0 comments on commit ffde644

Please sign in to comment.