Skip to content

Commit

Permalink
Functions: connection handler (#9693)
Browse files Browse the repository at this point in the history
* Functions: connection handler prep work

* S4 methods

* FunctionsConnector tests

* Fixed broken test

* Addressed PR feedback

* Allowlist refactor

* Addressed PR feedback
  • Loading branch information
Andrei Smirnov authored Jun 28, 2023
1 parent e4e6475 commit 369ce8c
Show file tree
Hide file tree
Showing 18 changed files with 708 additions and 150 deletions.
161 changes: 151 additions & 10 deletions core/services/functions/connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,48 @@ package functions
import (
"context"
"crypto/ecdsa"
"encoding/json"
"fmt"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/s4"
"github.com/smartcontractkit/chainlink/v2/core/utils"

ethCommon "github.com/ethereum/go-ethereum/common"
)

type functionsConnectorHandler struct {
connector connector.GatewayConnector
signerKey *ecdsa.PrivateKey
lggr logger.Logger
utils.StartStopOnce

connector connector.GatewayConnector
signerKey *ecdsa.PrivateKey
nodeAddress string
storage s4.Storage
allowlist functions.OnchainAllowlist
lggr logger.Logger
}

const (
methodSecretsSet = "secrets_set"
methodSecretsList = "secrets_list"
)

var (
_ connector.Signer = &functionsConnectorHandler{}
_ connector.GatewayConnectorHandler = &functionsConnectorHandler{}
)

func NewFunctionsConnectorHandler(signerKey *ecdsa.PrivateKey, lggr logger.Logger) *functionsConnectorHandler {
func NewFunctionsConnectorHandler(nodeAddress string, signerKey *ecdsa.PrivateKey, storage s4.Storage, allowlist functions.OnchainAllowlist, lggr logger.Logger) *functionsConnectorHandler {
return &functionsConnectorHandler{
signerKey: signerKey,
lggr: lggr,
nodeAddress: nodeAddress,
signerKey: signerKey,
storage: storage,
allowlist: allowlist,
lggr: lggr.Named("functionsConnectorHandler"),
}
}

Expand All @@ -36,14 +56,135 @@ func (h *functionsConnectorHandler) Sign(data ...[]byte) ([]byte, error) {
return common.SignData(h.signerKey, data...)
}

func (h *functionsConnectorHandler) HandleGatewayMessage(gatewayId string, msg *api.Message) {
h.lggr.Debugw("functionsConnectorHandler: received message from gateway", "id", gatewayId)
func (h *functionsConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayId string, body *api.MessageBody) {
fromAddr := ethCommon.HexToAddress(body.Sender)
if !h.allowlist.Allow(fromAddr) {
h.lggr.Errorw("allowlist prevented the request from this address", "id", gatewayId, "address", fromAddr)
return
}

h.lggr.Debugw("handling gateway request", "id", gatewayId, "method", body.Method)

switch body.Method {
case methodSecretsList:
h.handleSecretsList(ctx, gatewayId, body, fromAddr)
case methodSecretsSet:
h.handleSecretsSet(ctx, gatewayId, body, fromAddr)
default:
h.lggr.Errorw("unsupported method", "id", gatewayId, "method", body.Method)
}
}

func (h *functionsConnectorHandler) Start(ctx context.Context) error {
return nil
return h.StartOnce("FunctionsConnectorHandler", func() error {
return h.allowlist.Start(ctx)
})
}

func (h *functionsConnectorHandler) Close() error {
return nil
return h.StopOnce("FunctionsConnectorHandler", func() error {
return h.allowlist.Close()
})
}

func (h *functionsConnectorHandler) handleSecretsList(ctx context.Context, gatewayId string, body *api.MessageBody, fromAddr ethCommon.Address) {
type ListRow struct {
SlotID uint `json:"slot_id"`
Version uint64 `json:"version"`
Expiration int64 `json:"expiration"`
}

type ListResponse struct {
Success bool `json:"success"`
ErrorMessage string `json:"error_message,omitempty"`
Rows []ListRow `json:"rows,omitempty"`
}

var response ListResponse
snapshot, err := h.storage.List(ctx, fromAddr)
if err == nil {
response.Success = true
response.Rows = make([]ListRow, len(snapshot))
for i, row := range snapshot {
response.Rows[i] = ListRow{
SlotID: row.SlotId,
Version: row.Version,
Expiration: row.Expiration,
}
}
} else {
response.ErrorMessage = fmt.Sprintf("Failed to list secrets: %v", err)
}

if err := h.sendResponse(ctx, gatewayId, body, response); err != nil {
h.lggr.Errorw("failed to send response to gateway", "id", gatewayId, "error", err)
}
}

func (h *functionsConnectorHandler) handleSecretsSet(ctx context.Context, gatewayId string, body *api.MessageBody, fromAddr ethCommon.Address) {
type SetRequest struct {
SlotID uint `json:"slot_id"`
Version uint64 `json:"version"`
Expiration int64 `json:"expiration"`
Payload []byte `json:"payload"`
Signature []byte `json:"signature"`
}

type SetResponse struct {
Success bool `json:"success"`
ErrorMessage string `json:"error_message,omitempty"`
}

var request SetRequest
var response SetResponse
err := json.Unmarshal(body.Payload, &request)
if err == nil {
key := s4.Key{
Address: fromAddr,
SlotId: request.SlotID,
Version: request.Version,
}
record := s4.Record{
Expiration: request.Expiration,
Payload: request.Payload,
}
err = h.storage.Put(ctx, &key, &record, request.Signature)
if err == nil {
response.Success = true
} else {
response.ErrorMessage = fmt.Sprintf("Failed to set secret: %v", err)
}
} else {
response.ErrorMessage = fmt.Sprintf("Bad request to set secret: %v", err)
}

if err := h.sendResponse(ctx, gatewayId, body, response); err != nil {
h.lggr.Errorw("failed to send response to gateway", "id", gatewayId, "error", err)
}
}

func (h *functionsConnectorHandler) sendResponse(ctx context.Context, gatewayId string, requestBody *api.MessageBody, payload any) error {
payloadJson, err := json.Marshal(payload)
if err != nil {
return err
}

msg := &api.Message{
Body: api.MessageBody{
MessageId: requestBody.MessageId,
DonId: requestBody.DonId,
Method: requestBody.Method,
Sender: h.nodeAddress,
Payload: payloadJson,
},
}
if err = msg.Sign(h.signerKey); err != nil {
return err
}

err = h.connector.SendToGateway(ctx, gatewayId, msg)
if err == nil {
h.lggr.Debugw("sent to gateway", "id", gatewayId, "messageId", requestBody.MessageId, "donId", requestBody.DonId, "method", requestBody.Method)
}
return err
}
197 changes: 197 additions & 0 deletions core/services/functions/connector_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package functions_test

import (
"encoding/base64"
"encoding/json"
"errors"
"testing"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/common"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
gfmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/s4"
s4mocks "github.com/smartcontractkit/chainlink/v2/core/services/s4/mocks"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestFunctionsConnectorHandler(t *testing.T) {
t.Parallel()

logger := logger.TestLogger(t)
privateKey, addr := testutils.NewPrivateKeyAndAddress(t)
storage := s4mocks.NewStorage(t)
connector := gcmocks.NewGatewayConnector(t)
allowlist := gfmocks.NewOnchainAllowlist(t)
allowlist.On("Start", mock.Anything).Return(nil)
allowlist.On("Close", mock.Anything).Return(nil)
handler := functions.NewFunctionsConnectorHandler(addr.Hex(), privateKey, storage, allowlist, logger)
require.NotNil(t, handler)

handler.SetConnector(connector)

err := handler.Start(testutils.Context(t))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, handler.Close())
})

t.Run("Sign", func(t *testing.T) {
signature, err := handler.Sign([]byte("test"))
require.NoError(t, err)

signer, err := common.ValidateSignature(signature, []byte("test"))
require.NoError(t, err)
require.Equal(t, addr.Bytes(), signer)
})

t.Run("HandleGatewayMessage", func(t *testing.T) {
t.Run("secrets_list", func(t *testing.T) {
msg := api.Message{
Body: api.MessageBody{
DonId: "fun4",
MessageId: "1",
Method: "secrets_list",
Sender: addr.Hex(),
},
}
require.NoError(t, msg.Sign(privateKey))

ctx := testutils.Context(t)
snapshot := []*s4.SnapshotRow{
{SlotId: 1, Version: 1, Expiration: 1},
{SlotId: 2, Version: 2, Expiration: 2},
}
storage.On("List", ctx, addr).Return(snapshot, nil).Once()
allowlist.On("Allow", addr).Return(true).Once()
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
require.Equal(t, `{"success":true,"rows":[{"slot_id":1,"version":1,"expiration":1},{"slot_id":2,"version":2,"expiration":2}]}`, string(msg.Body.Payload))

}).Return(nil).Once()

handler.HandleGatewayMessage(ctx, "gw1", &msg.Body)

t.Run("orm error", func(t *testing.T) {
storage.On("List", ctx, addr).Return(nil, errors.New("boom")).Once()
allowlist.On("Allow", addr).Return(true).Once()
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
require.Equal(t, `{"success":false,"error_message":"Failed to list secrets: boom"}`, string(msg.Body.Payload))

}).Return(nil).Once()

handler.HandleGatewayMessage(ctx, "gw1", &msg.Body)
})

t.Run("not allowed", func(t *testing.T) {
allowlist.On("Allow", addr).Return(false).Once()
handler.HandleGatewayMessage(ctx, "gw1", &msg.Body)
})
})

t.Run("secrets_set", func(t *testing.T) {
ctx := testutils.Context(t)
key := s4.Key{
Address: addr,
SlotId: 3,
Version: 4,
}
record := s4.Record{
Expiration: 5,
Payload: []byte("test"),
}
signature, err := s4.NewEnvelopeFromRecord(&key, &record).Sign(privateKey)
signatureB64 := base64.StdEncoding.EncodeToString(signature)
require.NoError(t, err)

msg := api.Message{
Body: api.MessageBody{
DonId: "fun4",
MessageId: "1",
Method: "secrets_set",
Sender: addr.Hex(),
Payload: json.RawMessage(`{"slot_id":3,"version":4,"expiration":5,"payload":"dGVzdA==","signature":"` + signatureB64 + `"}`),
},
}
require.NoError(t, msg.Sign(privateKey))

storage.On("Put", ctx, &key, &record, signature).Return(nil).Once()
allowlist.On("Allow", addr).Return(true).Once()
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
require.Equal(t, `{"success":true}`, string(msg.Body.Payload))

}).Return(nil).Once()

handler.HandleGatewayMessage(ctx, "gw1", &msg.Body)

t.Run("orm error", func(t *testing.T) {
storage.On("Put", ctx, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("boom")).Once()
allowlist.On("Allow", addr).Return(true).Once()
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
require.Equal(t, `{"success":false,"error_message":"Failed to set secret: boom"}`, string(msg.Body.Payload))

}).Return(nil).Once()

handler.HandleGatewayMessage(ctx, "gw1", &msg.Body)
})

t.Run("missing signature", func(t *testing.T) {
msg.Body.Payload = json.RawMessage(`{"slot_id":3,"version":4,"expiration":5,"payload":"dGVzdA=="}`)
require.NoError(t, msg.Sign(privateKey))
storage.On("Put", ctx, mock.Anything, mock.Anything, mock.Anything).Return(s4.ErrWrongSignature).Once()
allowlist.On("Allow", addr).Return(true).Once()
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
require.Equal(t, `{"success":false,"error_message":"Failed to set secret: wrong signature"}`, string(msg.Body.Payload))

}).Return(nil).Once()

handler.HandleGatewayMessage(ctx, "gw1", &msg.Body)
})

t.Run("malformed request", func(t *testing.T) {
msg.Body.Payload = json.RawMessage(`{sdfgdfgoscsicosd:sdf:::sdf ::; xx}`)
require.NoError(t, msg.Sign(privateKey))
allowlist.On("Allow", addr).Return(true).Once()
connector.On("SendToGateway", ctx, "gw1", mock.Anything).Run(func(args mock.Arguments) {
msg, ok := args[2].(*api.Message)
require.True(t, ok)
require.Equal(t, `{"success":false,"error_message":"Bad request to set secret: invalid character 's' looking for beginning of object key string"}`, string(msg.Body.Payload))

}).Return(nil).Once()

handler.HandleGatewayMessage(ctx, "gw1", &msg.Body)
})
})

t.Run("unsupported method", func(t *testing.T) {
msg := api.Message{
Body: api.MessageBody{
DonId: "fun4",
MessageId: "1",
Method: "foobar",
Sender: addr.Hex(),
Payload: []byte("whatever"),
},
}
require.NoError(t, msg.Sign(privateKey))

allowlist.On("Allow", addr).Return(true).Once()
handler.HandleGatewayMessage(testutils.Context(t), "gw1", &msg.Body)
})
})
}
Loading

0 comments on commit 369ce8c

Please sign in to comment.