Skip to content

Commit

Permalink
Merge pull request #344 from ava-labs/decider
Browse files Browse the repository at this point in the history
`ShouldSendMessage` decider
  • Loading branch information
feuGeneA authored Jul 26, 2024
2 parents 7ca00fd + f8662fd commit 6362bca
Show file tree
Hide file tree
Showing 24 changed files with 738 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,10 @@ jobs:
with:
submodules: recursive

- name: Install buf
uses: bufbuild/buf-setup-action@v1.31.0
with:
github_token: ${{ github.token }}

- name: Run E2E Tests
run: AVALANCHEGO_BUILD_PATH=/tmp/e2e-test/avalanchego DATA_DIR=/tmp/e2e-test/data ./scripts/e2e_test.sh
10 changes: 10 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,15 @@ jobs:
with:
go-version-file: 'go.mod'

- name: Install buf
uses: bufbuild/buf-setup-action@v1.31.0

- name: Run Lint
run: ./scripts/lint.sh --go-lint

- name: Ensure protobuf changes are checked in
run: |
scripts/protobuf_codegen.sh
git update-index --really-refresh >> /dev/null
git diff-index HEAD # to show the differences
git diff-index --quiet HEAD || (echo 'protobuf generated code changes have not all been checked in' && exit 1)
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
build/
__debug_bin
tests/cmd/decider/decider

.vscode*

Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ awm-relayer --help Display awm-relayer usag

### Building

Before building, be sure to install Go, which is required even if you're just building the Docker image.
Before building, be sure to install Go, which is required even if you're just building the Docker image. You'll also need to install [buf](github.com/bufbuild/buf/).

Build the relayer by running the script:

Expand Down Expand Up @@ -291,6 +291,10 @@ The relayer is configured via a JSON file, the path to which is passed in via th

- The AWS region in which the KMS key is located. Required if `kms-key-id` is provided.

`"decider-url": string`

- The URL of a service implementing the gRPC service defined by `proto/decider`, which will be queried for each message to determine whether that message should be relayed.

## Architecture

### Components
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"net/url"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/constants"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Config struct {
SourceBlockchains []*SourceBlockchain `mapstructure:"source-blockchains" json:"source-blockchains"`
DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"`
ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"`
DeciderURL string `mapstructure:"decider-url" json:"decider-url"`

// convenience field to fetch a blockchain's subnet ID
blockchainIDToSubnetID map[ids.ID]ids.ID
Expand Down Expand Up @@ -119,6 +121,12 @@ func (c *Config) Validate() error {
}
c.blockchainIDToSubnetID = blockchainIDToSubnetID

if len(c.DeciderURL) != 0 {
if _, err := url.ParseRequestURI(c.DeciderURL); err != nil {
return fmt.Errorf("Invalid decider URL: %w", err)
}
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ require (
github.com/stretchr/testify v1.9.0
go.uber.org/mock v0.4.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
)

require (
Expand Down Expand Up @@ -158,8 +160,6 @@ require (
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.3.0 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
46 changes: 45 additions & 1 deletion main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"net/http"
"os"
"runtime"
"strings"

"github.com/ava-labs/avalanchego/api/metrics"
Expand All @@ -33,6 +34,8 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var version = "v0.0.0-dev"
Expand Down Expand Up @@ -174,7 +177,20 @@ func main() {

relayerHealth := createHealthTrackers(&cfg)

messageHandlerFactories, err := createMessageHandlerFactories(logger, &cfg)
deciderConnection, err := createDeciderConnection(cfg.DeciderURL)
if err != nil {
logger.Fatal(
"Failed to instantiate decider connection",
zap.Error(err),
)
panic(err)
}

messageHandlerFactories, err := createMessageHandlerFactories(
logger,
&cfg,
deciderConnection,
)
if err != nil {
logger.Fatal("Failed to create message handler factories", zap.Error(err))
panic(err)
Expand Down Expand Up @@ -240,6 +256,7 @@ func main() {
func createMessageHandlerFactories(
logger logging.Logger,
globalConfig *config.Config,
deciderConnection *grpc.ClientConn,
) (map[ids.ID]map[common.Address]messages.MessageHandlerFactory, error) {
messageHandlerFactories := make(map[ids.ID]map[common.Address]messages.MessageHandlerFactory)
for _, sourceBlockchain := range globalConfig.SourceBlockchains {
Expand All @@ -258,6 +275,7 @@ func createMessageHandlerFactories(
logger,
address,
cfg,
deciderConnection,
)
case config.OFF_CHAIN_REGISTRY:
m, err = offchainregistry.NewMessageHandlerFactory(
Expand Down Expand Up @@ -432,6 +450,32 @@ func createApplicationRelayersForSourceChain(
return applicationRelayers, minHeight, nil
}

// create a connection to the "should send message" decider service.
// if url is unspecified, returns a nil client pointer
func createDeciderConnection(url string) (*grpc.ClientConn, error) {
if len(url) == 0 {
return nil, nil
}

connection, err := grpc.NewClient(
url,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf(
"Failed to instantiate grpc client: %w",
err,
)
}

runtime.SetFinalizer(
connection,
func(c *grpc.ClientConn) { c.Close() },
)

return connection, nil
}

func createHealthTrackers(cfg *config.Config) map[ids.ID]*atomic.Bool {
healthTrackers := make(map[ids.ID]*atomic.Bool, len(cfg.SourceBlockchains))
for _, sourceBlockchain := range cfg.SourceBlockchains {
Expand Down
70 changes: 69 additions & 1 deletion messages/teleporter/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/messages"
pbDecider "github.com/ava-labs/awm-relayer/proto/pb/decider"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/accounts/abi/bind"
Expand All @@ -25,25 +26,40 @@ import (
teleporterUtils "github.com/ava-labs/teleporter/utils/teleporter-utils"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type factory struct {
messageConfig Config
protocolAddress common.Address
logger logging.Logger
deciderClient pbDecider.DeciderServiceClient
}

type messageHandler struct {
logger logging.Logger
teleporterMessage *teleportermessenger.TeleporterMessage
unsignedMessage *warp.UnsignedMessage
factory *factory
deciderClient pbDecider.DeciderServiceClient
}

// define an "empty" decider client to use when a connection isn't provided:
type emptyDeciderClient struct{}

func (s *emptyDeciderClient) ShouldSendMessage(
_ context.Context,
_ *pbDecider.ShouldSendMessageRequest,
_ ...grpc.CallOption,
) (*pbDecider.ShouldSendMessageResponse, error) {
return &pbDecider.ShouldSendMessageResponse{ShouldSendMessage: true}, nil
}

func NewMessageHandlerFactory(
logger logging.Logger,
messageProtocolAddress common.Address,
messageProtocolConfig config.MessageProtocolConfig,
deciderClientConn *grpc.ClientConn,
) (messages.MessageHandlerFactory, error) {
// Marshal the map and unmarshal into the Teleporter config
data, err := json.Marshal(messageProtocolConfig.Settings)
Expand All @@ -65,10 +81,18 @@ func NewMessageHandlerFactory(
return nil, err
}

var deciderClient pbDecider.DeciderServiceClient
if deciderClientConn == nil {
deciderClient = &emptyDeciderClient{}
} else {
deciderClient = pbDecider.NewDeciderServiceClient(deciderClientConn)
}

return &factory{
messageConfig: messageConfig,
protocolAddress: messageProtocolAddress,
logger: logger,
deciderClient: deciderClient,
}, nil
}

Expand All @@ -86,6 +110,7 @@ func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (mess
teleporterMessage: teleporterMessage,
unsignedMessage: unsignedMessage,
factory: f,
deciderClient: f.deciderClient,
}, nil
}

Expand Down Expand Up @@ -166,8 +191,51 @@ func (m *messageHandler) ShouldSendMessage(destinationClient vms.DestinationClie
)
return false, nil
}
// Dispatch to the external decider service. If the service is unavailable or returns
// an error, then use the decision that has already been made, i.e. return true
decision, err := m.getShouldSendMessageFromDecider()
if err != nil {
m.logger.Warn(
"Error delegating to decider",
zap.String("warpMessageID", m.unsignedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessageID.String()),
)
return true, nil
}
if !decision {
m.logger.Info(
"Decider rejected message",
zap.String("warpMessageID", m.unsignedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessageID.String()),
zap.String("destinationBlockchainID", destinationBlockchainID.String()),
)
}
return decision, nil
}

// Queries the decider service to determine whether this message should be
// sent. If the decider client is nil, returns true.
func (m *messageHandler) getShouldSendMessageFromDecider() (bool, error) {
warpMsgID := m.unsignedMessage.ID()

ctx, cancelCtx := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelCtx()
response, err := m.deciderClient.ShouldSendMessage(
ctx,
&pbDecider.ShouldSendMessageRequest{
NetworkId: m.unsignedMessage.NetworkID,
SourceChainId: m.unsignedMessage.SourceChainID[:],
Payload: m.unsignedMessage.Payload,
BytesRepresentation: m.unsignedMessage.Bytes(),
Id: warpMsgID[:],
},
)
if err != nil {
m.logger.Error("Error response from decider.", zap.Error(err))
return false, err
}

return true, nil
return response.ShouldSendMessage, nil
}

// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage
Expand Down
1 change: 1 addition & 0 deletions messages/teleporter/message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func TestShouldSendMessage(t *testing.T) {
logger,
messageProtocolAddress,
messageProtocolConfig,
nil,
)
require.NoError(t, err)
messageHandler, err := factory.NewMessageHandler(test.warpUnsignedMessage)
Expand Down
11 changes: 11 additions & 0 deletions proto/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Protobuf linting and generation for this project is managed by
[buf](https://github.com/bufbuild/buf).

Please find installation instructions at
[https://docs.buf.build/installation/](https://docs.buf.build/installation/).

When changes are made to the proto definition, the generated source code can be updated by running
`protobuf_codegen.sh` located in the `scripts/` directory of this repo.

Introduction to `buf`
[https://docs.buf.build/tour/introduction](https://docs.buf.build/tour/introduction)
8 changes: 8 additions & 0 deletions proto/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: v1
plugins:
- name: go
out: pb
opt: paths=source_relative
- name: go-grpc
out: pb
opt: paths=source_relative
8 changes: 8 additions & 0 deletions proto/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: prometheus
repository: client-model
commit: e171c0b235c546d5a9a597c2961bd357
digest: shake256:7db3f73ac0f1dce71e70f304f318e9741e857fd78b7b42f0df7a3da353fbb2f387899da7b0a77ac9ee9565194510e39a913cdb9a8ab3c2ff4b8713428c795213
10 changes: 10 additions & 0 deletions proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: v1
name: buf.build/ava-labs/awm-relayer
breaking:
use:
- FILE
lint:
use:
- DEFAULT
except:
- PACKAGE_VERSION_SUFFIX # versioned naming <service>.v1beta
21 changes: 21 additions & 0 deletions proto/decider/decider.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";

package decider;

option go_package = "github.com/ava-labs/awm-relayer/proto/pb/decider";

service DeciderService {
rpc ShouldSendMessage(ShouldSendMessageRequest) returns (ShouldSendMessageResponse);
}

message ShouldSendMessageRequest {
uint32 network_id = 1;
bytes source_chain_id = 2;
bytes payload = 3;
bytes bytes_representation = 4;
bytes id = 5;
}

message ShouldSendMessageResponse {
bool should_send_message = 1;
}
Loading

0 comments on commit 6362bca

Please sign in to comment.