Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache validator signatures #415

Merged
merged 17 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
defaultAPIPort = uint16(8080)
defaultMetricsPort = uint16(9090)
defaultIntervalSeconds = uint64(10)
defaultSignatureCacheSize = uint64(1024 * 1024)
)

var defaultLogLevel = logging.Info.String()
Expand Down Expand Up @@ -61,6 +62,7 @@ type Config struct {
DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"`
ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"`
DeciderURL string `mapstructure:"decider-url" json:"decider-url"`
SignatureCacheSize uint64 `mapstructure:"signature-cache-size" json:"signature-cache-size"`

// convenience field to fetch a blockchain's subnet ID
blockchainIDToSubnetID map[ids.ID]ids.ID
Expand Down
1 change: 1 addition & 0 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ const (
ProcessMissedBlocksKey = "process-missed-blocks"
ManualWarpMessagesKey = "manual-warp-messages"
DBWriteIntervalSecondsKey = "db-write-interval-seconds"
SignatureCacheSizeKey = "signature-cache-size"
)
4 changes: 4 additions & 0 deletions config/viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func SetDefaultConfigValues(v *viper.Viper) {
v.SetDefault(APIPortKey, defaultAPIPort)
v.SetDefault(MetricsPortKey, defaultMetricsPort)
v.SetDefault(DBWriteIntervalSecondsKey, defaultIntervalSeconds)
v.SetDefault(
SignatureCacheSizeKey,
defaultSignatureCacheSize,
)
}

// BuildConfig constructs the relayer config using Viper.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.9
github.com/aws/aws-sdk-go-v2/service/kms v1.35.3
github.com/ethereum/go-ethereum v1.13.8
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/onsi/ginkgo/v2 v2.20.0
github.com/onsi/gomega v1.34.1
github.com/pingcap/errors v0.11.4
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.0
github.com/redis/go-redis/v9 v9.6.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 h1:3JQNjnMRil1yD0IfZKHF9GxxWKDJGj8I0IqOUol//sw=
Expand Down
7 changes: 6 additions & 1 deletion main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,19 @@ func main() {
panic(err)
}

signatureAggregator := aggregator.NewSignatureAggregator(
signatureAggregator, err := aggregator.NewSignatureAggregator(
network,
logger,
cfg.SignatureCacheSize,
sigAggMetrics.NewSignatureAggregatorMetrics(
prometheus.DefaultRegisterer,
),
messageCreator,
)
if err != nil {
logger.Fatal("Failed to create signature aggregator", zap.Error(err))
panic(err)
}

applicationRelayers, minHeights, err := createApplicationRelayers(
context.Background(),
Expand Down
97 changes: 83 additions & 14 deletions signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/signature-aggregator/aggregator/cache"
"github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/ava-labs/awm-relayer/utils"
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
Expand Down Expand Up @@ -58,33 +59,41 @@ type SignatureAggregator struct {
currentRequestID atomic.Uint32
subnetsMapLock sync.RWMutex
metrics *metrics.SignatureAggregatorMetrics
cache *cache.Cache
}

func NewSignatureAggregator(
network *peers.AppRequestNetwork,
logger logging.Logger,
signatureCacheSize uint64,
metrics *metrics.SignatureAggregatorMetrics,
messageCreator message.Creator,
) *SignatureAggregator {
) (*SignatureAggregator, error) {
cache, err := cache.NewCache(signatureCacheSize, logger)
if err != nil {
return nil, fmt.Errorf(
"failed to create signature cache: %w",
err,
)
}
sa := SignatureAggregator{
network: network,
subnetIDsByBlockchainID: map[ids.ID]ids.ID{},
logger: logger,
metrics: metrics,
messageCreator: messageCreator,
currentRequestID: atomic.Uint32{},
cache: cache,
}
sa.currentRequestID.Store(rand.Uint32())
return &sa
return &sa, nil
}

func (s *SignatureAggregator) CreateSignedMessage(
unsignedMessage *avalancheWarp.UnsignedMessage,
inputSigningSubnet ids.ID,
quorumPercentage uint64,
) (*avalancheWarp.Message, error) {
requestID := s.currentRequestID.Add(1)

var signingSubnet ids.ID
var err error
// If signingSubnet is not set we default to the subnet of the source blockchain
Expand All @@ -102,7 +111,6 @@ func (s *SignatureAggregator) CreateSignedMessage(
}

connectedValidators, err := s.network.ConnectToCanonicalValidators(signingSubnet)

if err != nil {
msg := "Failed to connect to canonical validators"
s.logger.Error(
Expand All @@ -113,6 +121,7 @@ func (s *SignatureAggregator) CreateSignedMessage(
s.metrics.FailuresToGetValidatorSet.Inc()
return nil, fmt.Errorf("%s: %w", msg, err)
}

if !utils.CheckStakeWeightPercentageExceedsThreshold(
big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
connectedValidators.TotalValidatorWeight,
Expand All @@ -128,6 +137,39 @@ func (s *SignatureAggregator) CreateSignedMessage(
return nil, errNotEnoughConnectedStake
}

accumulatedSignatureWeight := big.NewInt(0)

signatureMap := make(map[int][bls.SignatureLen]byte)
if cachedSignatures, ok := s.cache.Get(unsignedMessage.ID()); ok {
for i, validator := range connectedValidators.ValidatorSet {
cachedSignature, found := cachedSignatures[cache.PublicKeyBytes(validator.PublicKeyBytes)]
if found {
signatureMap[i] = cachedSignature
accumulatedSignatureWeight.Add(
accumulatedSignatureWeight,
new(big.Int).SetUint64(validator.Weight),
)
}
}
s.metrics.SignatureCacheHits.Add(float64(len(signatureMap)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind incrementing based on the number of signatures (i.e. the cache value) rather than the number of key hits? Signature-based metrics would make it more difficult to identify appropriate cache sizes based on usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not my PR but not sure what do you mean by this. The signatureMap here is just the number of key hits from my understanding.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is keyed by warpMessageID, but we're adding metrics based on the size of the corresponding value. If the Warp message is signed by 1 or 1000 validators, it's still a cache hit or miss, but the measuring the number of signatures would tell a wildly different story than measuring the actual cache hit/miss rate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed on standup, I left the metric in place on a per-signature basis. also, I changed the "cache miss" metric to reflect the number of signatures that need to be requested from the network, in 70b7722

}
if signedMsg, err := s.aggregateIfSufficientWeight(
unsignedMessage,
signatureMap,
accumulatedSignatureWeight,
connectedValidators,
quorumPercentage,
); err != nil {
return nil, err
} else if signedMsg != nil {
return signedMsg, nil
}
if len(signatureMap) > 0 {
s.metrics.SignatureCacheMisses.Add(float64(
len(connectedValidators.ValidatorSet) - len(signatureMap),
))
}

// TODO: remove this special handling and replace with ACP-118 interface once available
var reqBytes []byte
if sourceSubnet == constants.PrimaryNetworkID {
Expand All @@ -152,6 +194,7 @@ func (s *SignatureAggregator) CreateSignedMessage(
}

// Construct the AppRequest
requestID := s.currentRequestID.Add(1)
outMsg, err := s.messageCreator.AppRequest(
unsignedMessage.SourceChainID,
requestID,
Expand All @@ -169,9 +212,6 @@ func (s *SignatureAggregator) CreateSignedMessage(
}

// Query the validators with retries. On each retry, query one node per unique BLS pubkey
accumulatedSignatureWeight := big.NewInt(0)

signatureMap := make(map[int]blsSignatureBuf)
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
responsesExpected := len(connectedValidators.ValidatorSet) - len(signatureMap)
s.logger.Debug(
Expand Down Expand Up @@ -214,6 +254,7 @@ func (s *SignatureAggregator) CreateSignedMessage(
responseChan := s.network.RegisterRequestID(requestID, vdrSet.Len())

sentTo := s.network.Send(outMsg, vdrSet, sourceSubnet, subnets.NoOpAllower)
s.metrics.AppRequestCount.Inc()
s.logger.Debug(
"Sent signature request to network",
zap.String("warpMessageID", unsignedMessage.ID().String()),
Expand Down Expand Up @@ -329,7 +370,7 @@ func (s *SignatureAggregator) handleResponse(
requestID uint32,
connectedValidators *peers.ConnectedCanonicalValidators,
unsignedMessage *avalancheWarp.UnsignedMessage,
signatureMap map[int]blsSignatureBuf,
signatureMap map[int][bls.SignatureLen]byte,
accumulatedSignatureWeight *big.Int,
quorumPercentage uint64,
) (*avalancheWarp.Message, bool, error) {
Expand Down Expand Up @@ -369,6 +410,11 @@ func (s *SignatureAggregator) handleResponse(
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
signatureMap[vdrIndex] = signature
s.cache.Add(
unsignedMessage.ID(),
cache.PublicKeyBytes(validator.PublicKeyBytes),
cache.SignatureBytes(signature),
)
accumulatedSignatureWeight.Add(accumulatedSignatureWeight, new(big.Int).SetUint64(validator.Weight))
} else {
s.logger.Debug(
Expand All @@ -382,14 +428,37 @@ func (s *SignatureAggregator) handleResponse(
return nil, true, nil
}

if signedMsg, err := s.aggregateIfSufficientWeight(
unsignedMessage,
signatureMap,
accumulatedSignatureWeight,
connectedValidators,
quorumPercentage,
); err != nil {
return nil, true, err
} else if signedMsg != nil {
return signedMsg, true, nil
}

// Not enough signatures, continue processing messages
return nil, true, nil
geoff-vball marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *SignatureAggregator) aggregateIfSufficientWeight(
unsignedMessage *avalancheWarp.UnsignedMessage,
signatureMap map[int][bls.SignatureLen]byte,
accumulatedSignatureWeight *big.Int,
connectedValidators *peers.ConnectedCanonicalValidators,
quorumPercentage uint64,
) (*avalancheWarp.Message, error) {
// As soon as the signatures exceed the stake weight threshold we try to aggregate and send the transaction.
if !utils.CheckStakeWeightPercentageExceedsThreshold(
accumulatedSignatureWeight,
connectedValidators.TotalValidatorWeight,
quorumPercentage,
) {
// Not enough signatures, continue processing messages
return nil, true, nil
return nil, nil
}
aggSig, vdrBitSet, err := s.aggregateSignatures(signatureMap)
if err != nil {
Expand All @@ -400,7 +469,7 @@ func (s *SignatureAggregator) handleResponse(
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Error(err),
)
return nil, true, fmt.Errorf("%s: %w", msg, err)
return nil, fmt.Errorf("%s: %w", msg, err)
}

signedMsg, err := avalancheWarp.NewMessage(
Expand All @@ -418,9 +487,9 @@ func (s *SignatureAggregator) handleResponse(
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Error(err),
)
return nil, true, fmt.Errorf("%s: %w", msg, err)
return nil, fmt.Errorf("%s: %w", msg, err)
}
return signedMsg, true, nil
return signedMsg, nil
}

// isValidSignatureResponse tries to generate a signature from the peer.AsyncResponse, then verifies
Expand Down Expand Up @@ -490,7 +559,7 @@ func (s *SignatureAggregator) isValidSignatureResponse(
// returns a bit set representing the validators that are represented in the aggregate signature. The bit
// set is in canonical validator order.
func (s *SignatureAggregator) aggregateSignatures(
signatureMap map[int]blsSignatureBuf,
signatureMap map[int][bls.SignatureLen]byte,
) (*bls.Signature, set.Bits, error) {
// Aggregate the signatures
signatures := make([]*bls.Signature, 0, len(signatureMap))
Expand Down
66 changes: 66 additions & 0 deletions signature-aggregator/aggregator/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cache

import (
"math"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/logging"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pingcap/errors"
"go.uber.org/zap"
)

type Cache struct {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
logger logging.Logger

// map of warp message ID to a map of public keys to signatures
signatures *lru.Cache[ids.ID, map[PublicKeyBytes]SignatureBytes]
}

type PublicKeyBytes [bls.PublicKeyLen]byte
type SignatureBytes [bls.SignatureLen]byte

func NewCache(size uint64, logger logging.Logger) (*Cache, error) {
if size > math.MaxInt {
return nil, errors.New("cache size too big")
}

signatureCache, err := lru.New[ids.ID, map[PublicKeyBytes]SignatureBytes](int(size))
if err != nil {
return nil, err
}

return &Cache{
signatures: signatureCache,
logger: logger,
}, nil
}

func (c *Cache) Get(msgID ids.ID) (map[PublicKeyBytes]SignatureBytes, bool) {
cachedValue, isCached := c.signatures.Get(msgID)

if isCached {
c.logger.Debug("cache hit", zap.Stringer("msgID", msgID))
return cachedValue, true
} else {
c.logger.Debug("cache miss", zap.Stringer("msgID", msgID))
return nil, false
}
}

func (c *Cache) Add(
msgID ids.ID,
pubKey PublicKeyBytes,
signature SignatureBytes,
) {
var (
sigs map[PublicKeyBytes]SignatureBytes
ok bool
)
if sigs, ok = c.Get(msgID); !ok {
sigs = make(map[PublicKeyBytes]SignatureBytes)
}
sigs[pubKey] = signature
c.signatures.Add(msgID, sigs)
}
2 changes: 2 additions & 0 deletions signature-aggregator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func signatureAggregationAPIHandler(
zap.String("input", req.SigningSubnetID),
)
writeJSONError(logger, w, msg)
return
}
}

Expand All @@ -147,6 +148,7 @@ func signatureAggregationAPIHandler(
msg := "Failed to aggregate signatures"
logger.Warn(msg, zap.Error(err))
writeJSONError(logger, w, msg)
return
geoff-vball marked this conversation as resolved.
Show resolved Hide resolved
}
resp, err := json.Marshal(
AggregateSignatureResponse{
Expand Down
Loading
Loading