Skip to content

Commit

Permalink
Peer endpoint lookup and error handling (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdeziel authored Jul 15, 2022
1 parent 57238d6 commit 5de3b34
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 172 deletions.
10 changes: 5 additions & 5 deletions containers/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
- RVASP_FIXTURES_PATH=/fixtures
- RVASP_DATABASE_MAX_RETRIES=5
volumes:
- ../pkg/rvasp/fixtures:/fixtures
- ../fixtures/rvasps:/fixtures

gds:
image: trisa/gds:latest
Expand Down Expand Up @@ -59,7 +59,7 @@ services:
- 5434:4434
- 5435:4435
environment:
- RVASP_NAME=api.alice.vaspbot.net
- RVASP_NAME=alice
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand All @@ -83,7 +83,7 @@ services:
- 6434:4434
- 6435:4435
environment:
- RVASP_NAME=api.bob.vaspbot.net
- RVASP_NAME=bob
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand All @@ -107,7 +107,7 @@ services:
- 7434:4434
- 7435:4435
environment:
- RVASP_NAME=api.evil.vaspbot.net
- RVASP_NAME=evil
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand All @@ -131,7 +131,7 @@ services:
- 8434:4434
- 8435:4435
environment:
- RVASP_NAME=api.charlie.vaspbot.net
- RVASP_NAME=charlie
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand Down
1 change: 1 addition & 0 deletions pkg/rvasp/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewTRISAMock(conf *config.Config) (s *TRISA, remotePeers *peers.Peers, mock
remotePeers = peers.NewMock(s.certs, s.chain, conf.GDS.URL)
remotePeers.Add(&peers.PeerInfo{
CommonName: "alice",
Endpoint: "gds.example.io:443",
SigningKey: &s.sign.PublicKey,
})
parent.peers = remotePeers
Expand Down
76 changes: 76 additions & 0 deletions pkg/rvasp/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package rvasp

import (
"crypto/rsa"
"fmt"

"github.com/rs/zerolog/log"
"github.com/trisacrypto/trisa/pkg/trisa/peers"
)

// TODO: The peers cache needs to be flushed periodically to prevent the situation
// where remote peers change their endpoints or certificates and the rVASPs are stuck
// using the old info.

// fetchPeer returns the peer with the common name from the cache and performs a lookup
// against the directory service if the peer does not have an endpoint.
func (s *Server) fetchPeer(commonName string) (peer *peers.Peer, err error) {
// Retrieve or create the peer from the cache
if peer, err = s.peers.Get(commonName); err != nil {
log.Error().Err(err).Msg("could not create or fetch peer")
return nil, fmt.Errorf("could not create or fetch peer: %s", err)
}

// Ensure that the remote peer has an endpoint to connect to
if err = s.resolveEndpoint(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch endpoint from remote peer")
return nil, fmt.Errorf("could not fetch endpoint from remote peer: %s", err)
}

return peer, nil
}

// fetchSigningKey returns the signing key for the peer, performing an endpoint lookup
// and key exchange if necessary.
func (s *Server) fetchSigningKey(peer *peers.Peer) (key *rsa.PublicKey, err error) {
// Ensure that the remote peer has an endpoint to connect to
if err = s.resolveEndpoint(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch endpoint from remote peer")
return nil, fmt.Errorf("could not fetch endpoint from remote peer: %s", err)
}

if peer.SigningKey() == nil {
// If no key is available, perform a key exchange with the remote peer
if peer.ExchangeKeys(true); err != nil {
log.Warn().Str("common_name", peer.String()).Err(err).Msg("could not exchange keys with remote peer")
return nil, fmt.Errorf("could not exchange keys with remote peer: %s", err)
}

// Verify the key is now available on the peer
if peer.SigningKey() == nil {
log.Error().Str("common_name", peer.String()).Msg("peer has no key after key exchange")
return nil, fmt.Errorf("peer has no key after key exchange")
}
}

return peer.SigningKey(), nil
}

// resolveEndpoint ensures that the peer has an endpoint to connect to, and performs a
// lookup against the directory service to set the endpoint on the peer if necessary.
func (s *Server) resolveEndpoint(peer *peers.Peer) (err error) {
// If the endpoint is not in the peer, do the lookup to fetch the endpoint
if peer.Info().Endpoint == "" {
var remote *peers.Peer
if remote, err = s.peers.Lookup(peer.String()); err != nil {
log.Warn().Str("peer", peer.String()).Err(err).Msg("could not lookup peer")
return fmt.Errorf("could not lookup peer: %s", err)
}

if remote.Info().Endpoint == "" {
log.Error().Str("peer", peer.String()).Msg("peer has no endpoint after lookup")
return fmt.Errorf("peer has no endpoint after lookup")
}
}
return nil
}
92 changes: 51 additions & 41 deletions pkg/rvasp/rvasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,18 +291,18 @@ func (s *Server) fetchBeneficiaryWallet(req *pb.TransferRequest) (wallet *db.Wal
// information is not included in the payload. This function handles pending responses
// from the beneficiary saving the transaction in an "await" state in the database.
func (s *Server) sendTransfer(xfer *db.Transaction, beneficiary *db.Wallet, partial bool) (err error) {
// Conduct a GDS lookup if necessary to get the endpoint
// Fetch the remote peer
var peer *peers.Peer
if peer, err = s.peers.Search(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not search peer from directory service")
return status.Errorf(codes.FailedPrecondition, "could not search peer from directory service: %s", err)
if peer, err = s.fetchPeer(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not fetch beneficiary peer")
return status.Errorf(codes.FailedPrecondition, "could not fetch beneficiary peer: %s", err)
}

// Ensure that the local RVASP has signing keys for the remote, otherwise perform key exchange
// Fetch the signing key
var signKey *rsa.PublicKey
if signKey, err = peer.ExchangeKeys(true); err != nil {
log.Warn().Err(err).Msg("could not exchange keys with remote peer")
return status.Errorf(codes.FailedPrecondition, "could not exchange keys with remote peer: %s", err)
if signKey, err = s.fetchSigningKey(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch signing key from benficiary peer")
return status.Errorf(codes.FailedPrecondition, "could not fetch signing key from beneficiary peer: %s", err)
}

if err = s.db.Create(xfer).Error; err != nil {
Expand Down Expand Up @@ -387,9 +387,10 @@ func (s *Server) sendTransfer(xfer *db.Transaction, beneficiary *db.Wallet, part

// Parse the response payload
var pending *generic.Pending
if identity, transaction, pending, err = parsePayload(payload, true); err != nil {
log.Warn().Err(err).Msg("TRISA protocol error while parsing payload")
return status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", err)
var parseError *protocol.Error
if identity, transaction, pending, parseError = parsePayload(payload, true); parseError != nil {
log.Warn().Str("message", parseError.Message).Msg("TRISA protocol error while parsing payload")
return status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", parseError.Message)
}

// Update the transaction record with the identity payload
Expand Down Expand Up @@ -462,11 +463,11 @@ func (s *Server) sendTransfer(xfer *db.Transaction, beneficiary *db.Wallet, part

// sendError sends a TRISA error to the beneficiary.
func (s *Server) sendError(xfer *db.Transaction, beneficiary *db.Wallet) (err error) {
// Conduct a GDS lookup if necessary to get the endpoint
// Fetch the remote peer
var peer *peers.Peer
if peer, err = s.peers.Search(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not search peer from directory service")
return status.Errorf(codes.FailedPrecondition, "could not search peer from directory service: %s", err)
if peer, err = s.fetchPeer(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not fetch beneficiary peer")
return status.Errorf(codes.FailedPrecondition, "could not fetch beneficiary peer: %s", err)
}

reject := protocol.Errorf(protocol.ComplianceCheckFail, "rVASP mock compliance check failed")
Expand Down Expand Up @@ -494,7 +495,7 @@ func (s *Server) sendError(xfer *db.Transaction, beneficiary *db.Wallet) (err er

// respondAsync responds to a serviced transfer request from the beneficiary by
// continuing or completing the asynchronous handshake.
func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, identity *ivms101.IdentityPayload, transaction *generic.Transaction, xfer *db.Transaction) (out *protocol.SecureEnvelope, err error) {
func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, identity *ivms101.IdentityPayload, transaction *generic.Transaction, xfer *db.Transaction) (out *protocol.SecureEnvelope, transferError *protocol.Error) {
// Secure envelope was successfully received
now := time.Now()

Expand All @@ -504,46 +505,54 @@ func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, ident
return nil, protocol.Errorf(protocol.ComplianceCheckFail, "received expired transaction")
}

// Fetch the signing key from the remote peer
var signKey *rsa.PublicKey
var err error
if signKey, err = s.fetchSigningKey(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch signing key from beneficiary peer")
return nil, protocol.Errorf(protocol.NoSigningKey, "could not fetch signing key from beneficiary peer: %s", err)
}

// Marshal the identity payload into the transaction record
var data []byte
if data, err = protojson.Marshal(identity); err != nil {
log.Error().Err(err).Msg("could not marshal identity payload")
return nil, status.Errorf(codes.Internal, "could not marshal identity payload: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not marshal identity payload: %s", err)
}
xfer.Identity = string(data)

// Marshal the transaction payload into the transaction record
if data, err = protojson.Marshal(transaction); err != nil {
log.Error().Err(err).Msg("could not marshal transaction payload")
return nil, status.Errorf(codes.Internal, "could not marshal transaction payload: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not marshal transaction payload: %s", err)
}
xfer.Transaction = string(data)

// Fetch the beneficiary identity
if err = s.db.LookupIdentity(transaction.Beneficiary).First(&xfer.Beneficiary).Error; err != nil {
log.Error().Err(err).Str("wallet_address", transaction.Beneficiary).Msg("could not lookup beneficiary identity")
return nil, status.Errorf(codes.Internal, "could not lookup beneficiary identity: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not lookup beneficiary identity: %s", err)
}

// Save the peer name so we can access it later
xfer.Beneficiary.Provider = peer.String()
if err = s.db.Save(&xfer.Beneficiary).Error; err != nil {
log.Error().Err(err).Msg("could not save beneficiary identity")
return nil, status.Errorf(codes.Internal, "could not save beneficiary identity: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not save beneficiary identity: %s", err)
}

// Create the response envelope
out, reject, err := envelope.Seal(payload, envelope.WithRSAPublicKey(peer.SigningKey()))
out, reject, err := envelope.Seal(payload, envelope.WithRSAPublicKey(signKey))
if err != nil {
if reject != nil {
if out, err = envelope.Reject(reject); err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", err)
return nil, protocol.Errorf(protocol.EnvelopeDecodeFail, "TRISA protocol error: %s", err)
}
xfer.SetState(pb.TransactionState_REJECTED)
return out, nil
}
log.Warn().Err(err).Msg("TRISA protocol error while sealing envelope")
return nil, status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", err)
return nil, protocol.Errorf(protocol.EnvelopeDecodeFail, "TRISA protocol error: %s", err)
}

// Check the transaction state
Expand All @@ -566,7 +575,7 @@ func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, ident
account.Balance = account.Balance.Sub(xfer.Amount)
if err = s.db.Save(&account).Error; err != nil {
log.Error().Err(err).Msg("could not save originator account")
return nil, status.Errorf(codes.Internal, "could not save originator account: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not save originator account: %s", err)
}
xfer.SetState(pb.TransactionState_COMPLETED)
default:
Expand Down Expand Up @@ -601,6 +610,20 @@ func (s *Server) continueAsync(xfer *db.Transaction) (err error) {
return fmt.Errorf("could not fetch beneficiary address")
}

// Fetch the remote peer
var peer *peers.Peer
if peer, err = s.fetchPeer(beneficiary.Provider); err != nil {
log.Error().Err(err).Msg("could not fetch beneficiary peer")
return fmt.Errorf("could not fetch beneficiary peer: %s", err)
}

// Fetch the signing key from the remote peer
var signKey *rsa.PublicKey
if signKey, err = s.fetchSigningKey(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch signing key from beneficiary peer")
return fmt.Errorf("could not fetch signing key from beneficiary peer: %s", err)
}

// Fill the transaction with a new TxID to continue the handshake
var payload *protocol.Payload
transaction.Txid = uuid.New().String()
Expand All @@ -609,20 +632,6 @@ func (s *Server) continueAsync(xfer *db.Transaction) (err error) {
return fmt.Errorf("could not create transfer payload: %s", err)
}

// Conduct a GDS lookup if necessary to get the endpoint
var peer *peers.Peer
if peer, err = s.peers.Search(beneficiary.Provider); err != nil {
log.Error().Err(err).Msg("could not search peer from directory service")
return fmt.Errorf("could not search peer from directory service: %s", err)
}

// Ensure that the local RVASP has signing keys for the remote, otherwise perform key exchange
var signKey *rsa.PublicKey
if signKey, err = peer.ExchangeKeys(true); err != nil {
log.Warn().Err(err).Msg("could not exchange keys with remote peer")
return fmt.Errorf("could not exchange keys with remote peer: %s", err)
}

// Secure the envelope with the remote beneficiary's signing keys
msg, _, err := envelope.Seal(payload, envelope.WithEnvelopeID(xfer.Envelope), envelope.WithRSAPublicKey(signKey))
if err != nil {
Expand All @@ -645,9 +654,10 @@ func (s *Server) continueAsync(xfer *db.Transaction) (err error) {

// Parse the response payload
var pending *generic.Pending
if identity, _, pending, err = parsePayload(payload, true); err != nil {
log.Warn().Err(err).Msg("TRISA protocol error while parsing payload")
return fmt.Errorf("TRISA protocol error: %s", err)
var parseError *protocol.Error
if identity, _, pending, parseError = parsePayload(payload, true); parseError != nil {
log.Warn().Str("message", parseError.Message).Msg("TRISA protocol error while parsing payload")
return fmt.Errorf("TRISA protocol error: %s", parseError.Message)
}

if pending == nil {
Expand Down
19 changes: 10 additions & 9 deletions pkg/rvasp/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,43 +133,44 @@ func createPendingPayload(pending *generic.Pending, identity *ivms101.IdentityPa

// Parse a TRISA transfer payload, returning the identity payload and either the
// transaction payload or the pending message.
func parsePayload(payload *protocol.Payload, response bool) (identity *ivms101.IdentityPayload, transaction *generic.Transaction, pending *generic.Pending, err error) {
func parsePayload(payload *protocol.Payload, response bool) (identity *ivms101.IdentityPayload, transaction *generic.Transaction, pending *generic.Pending, parseError *protocol.Error) {
// Verify the sent_at timestamp if this is an accepted response payload
if payload.SentAt == "" {
log.Warn().Msg("missing sent at timestamp")
return nil, nil, nil, fmt.Errorf("missing sent_at timestamp")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing sent_at timestamp")
}

// Payload must contain an identity
if payload.Identity == nil {
log.Warn().Msg("payload does not contain an identity")
return nil, nil, nil, fmt.Errorf("missing identity payload")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing identity payload")
}

// Payload must contain a transaction
if payload.Transaction == nil {
log.Warn().Msg("payload does not contain a transaction")
return nil, nil, nil, fmt.Errorf("missing transaction payload")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing transaction payload")
}

// Parse the identity payload
identity = &ivms101.IdentityPayload{}
var err error
if err = payload.Identity.UnmarshalTo(identity); err != nil {
log.Warn().Err(err).Msg("could not unmarshal identity")
return nil, nil, nil, fmt.Errorf("could non unmarshal identity: %s", err)
return nil, nil, nil, protocol.Errorf(protocol.UnparseableIdentity, "could non unmarshal identity: %s", err)
}

// Validate identity fields
if identity.Originator == nil || identity.OriginatingVasp == nil || identity.BeneficiaryVasp == nil || identity.Beneficiary == nil {
log.Warn().Msg("incomplete identity payload")
return nil, nil, nil, fmt.Errorf("incomplete identity payload")
return nil, nil, nil, protocol.Errorf(protocol.IncompleteIdentity, "incomplete identity payload")
}

// Parse the transaction message type
var msgTx proto.Message
if msgTx, err = payload.Transaction.UnmarshalNew(); err != nil {
log.Warn().Err(err).Str("transaction_type", payload.Transaction.TypeUrl).Msg("could not unmarshal incoming transaction payload")
return nil, nil, nil, status.Errorf(codes.FailedPrecondition, "could not unmarshal transaction payload: %s", err)
return nil, nil, nil, protocol.Errorf(protocol.UnparseableTransaction, "could not unmarshal transaction payload: %s", err)
}

switch tx := msgTx.(type) {
Expand All @@ -180,13 +181,13 @@ func parsePayload(payload *protocol.Payload, response bool) (identity *ivms101.I
// NOTE: pending messages and intermediate messages will not contain received_at
if response && payload.ReceivedAt == "" {
log.Warn().Msg("missing received at timestamp")
return nil, nil, nil, fmt.Errorf("missing received_at timestamp")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing received_at timestamp")
}
case *generic.Pending:
pending = tx
default:
log.Warn().Str("transaction_type", payload.Transaction.TypeUrl).Msg("could not handle incoming transaction payload")
return nil, nil, nil, fmt.Errorf("unexpected transaction payload type: %s", payload.Transaction.TypeUrl)
return nil, nil, nil, protocol.Errorf(protocol.UnparseableTransaction, "unexpected transaction payload type: %s", payload.Transaction.TypeUrl)
}
return identity, transaction, pending, nil
}
Loading

0 comments on commit 5de3b34

Please sign in to comment.