Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SC-7143 endpoint lookup and error handling #104

Merged
merged 4 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions containers/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
- RVASP_FIXTURES_PATH=/fixtures
- RVASP_DATABASE_MAX_RETRIES=5
volumes:
- ../pkg/rvasp/fixtures:/fixtures
- ../fixtures/rvasps:/fixtures

gds:
image: trisa/gds:latest
Expand Down Expand Up @@ -59,7 +59,7 @@ services:
- 5434:4434
- 5435:4435
environment:
- RVASP_NAME=api.alice.vaspbot.net
- RVASP_NAME=alice
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand All @@ -83,7 +83,7 @@ services:
- 6434:4434
- 6435:4435
environment:
- RVASP_NAME=api.bob.vaspbot.net
- RVASP_NAME=bob
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand All @@ -107,7 +107,7 @@ services:
- 7434:4434
- 7435:4435
environment:
- RVASP_NAME=api.evil.vaspbot.net
- RVASP_NAME=evil
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand All @@ -131,7 +131,7 @@ services:
- 8434:4434
- 8435:4435
environment:
- RVASP_NAME=api.charlie.vaspbot.net
- RVASP_NAME=charlie
- RVASP_DATABASE_DSN=postgres://postgres:postgres@db:5432/rvasp?sslmode=disable
- RVASP_CERT_PATH=/certs/cert.pem
- RVASP_TRUST_CHAIN_PATH=/certs/cert.pem
Expand Down
1 change: 1 addition & 0 deletions pkg/rvasp/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewTRISAMock(conf *config.Config) (s *TRISA, remotePeers *peers.Peers, mock
remotePeers = peers.NewMock(s.certs, s.chain, conf.GDS.URL)
remotePeers.Add(&peers.PeerInfo{
CommonName: "alice",
Endpoint: "gds.example.io:443",
SigningKey: &s.sign.PublicKey,
})
parent.peers = remotePeers
Expand Down
77 changes: 77 additions & 0 deletions pkg/rvasp/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package rvasp

import (
"crypto/rsa"
"fmt"

"github.com/rs/zerolog/log"
"github.com/trisacrypto/trisa/pkg/trisa/peers"
)

// TODO: The peers cache needs to be flushed periodically to prevent the situation
// where remote peers change their endpoints or certificates and the rVASPs are stuck
// using the old info.

// fetchPeer returns the peer with the common name from the cache and performs a lookup
// against the directory service if the peer does not have an endpoint.
func (s *Server) fetchPeer(commonName string) (peer *peers.Peer, err error) {
// Retrieve or create the peer from the cache
if peer, err = s.peers.Get(commonName); err != nil {
log.Error().Err(err).Msg("could not create or fetch peer")
return nil, fmt.Errorf("could not create or fetch peer: %s", err)
}

// Ensure that the remote peer has an endpoint to connect to
if err = s.checkEndpoint(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch endpoint from remote peer")
return nil, fmt.Errorf("could not fetch endpoint from remote peer: %s", err)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbengfort @DanielSollis This might cause some issues when people get new certificates, I suppose we could just reboot the rVASPs but for the long term do we need to just have the rVASPs always do the lookup or manually flush the cache periodically?


return peer, nil
}

// fetchSigningKey returns the signing key for the peer, performing an endpoint lookup
// and key exchange if necessary.
func (s *Server) fetchSigningKey(peer *peers.Peer) (key *rsa.PublicKey, err error) {
// Ensure that the remote peer has an endpoint to connect to
if err = s.checkEndpoint(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch endpoint from remote peer")
return nil, fmt.Errorf("could not fetch endpoint from remote peer: %s", err)
}

if peer.SigningKey() == nil {
// If no key is available, perform a key exchange with the remote peer
if peer.ExchangeKeys(true); err != nil {
log.Warn().Str("common_name", peer.String()).Err(err).Msg("could not exchange keys with remote peer")
return nil, fmt.Errorf("could not exchange keys with remote peer: %s", err)
}

// Verify the key is now available on the peer
if peer.SigningKey() == nil {
log.Error().Str("common_name", peer.String()).Msg("peer has no key after key exchange")
return nil, fmt.Errorf("peer has no key after key exchange")
}
}

return peer.SigningKey(), nil
}

// checkEndpoint ensures that the peer has an endpoint to connect to, and performs a
// lookup against the directory service if it does not.
func (s *Server) checkEndpoint(peer *peers.Peer) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename this something like "resolveEndpoint", "checkEndpoint" doesn't really communicate that the endpoint might also be set.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point, I was struggling to think of a good name which communicates the behavior but I think resolveEndpoint is a bit better.

// If the endpoint is not in the peer, do the lookup to fetch the endpoint
if peer.Info().Endpoint == "" {
var remote *peers.Peer
if remote, err = s.peers.Lookup(peer.String()); err != nil {
log.Warn().Str("peer", peer.String()).Err(err).Msg("could not lookup peer")
return fmt.Errorf("could not lookup peer: %s", err)
}

if remote.Info().Endpoint == "" {
log.Error().Str("peer", peer.String()).Msg("peer has no endpoint after lookup")
return fmt.Errorf("peer has no endpoint after lookup")
}
peer = remote
}
return nil
}
92 changes: 51 additions & 41 deletions pkg/rvasp/rvasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,18 +291,18 @@ func (s *Server) fetchBeneficiaryWallet(req *pb.TransferRequest) (wallet *db.Wal
// information is not included in the payload. This function handles pending responses
// from the beneficiary saving the transaction in an "await" state in the database.
func (s *Server) sendTransfer(xfer *db.Transaction, beneficiary *db.Wallet, partial bool) (err error) {
// Conduct a GDS lookup if necessary to get the endpoint
// Fetch the remote peer
var peer *peers.Peer
if peer, err = s.peers.Search(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not search peer from directory service")
return status.Errorf(codes.FailedPrecondition, "could not search peer from directory service: %s", err)
if peer, err = s.fetchPeer(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not fetch beneficiary peer")
return status.Errorf(codes.FailedPrecondition, "could not fetch beneficiary peer: %s", err)
}

// Ensure that the local RVASP has signing keys for the remote, otherwise perform key exchange
// Fetch the signing key
var signKey *rsa.PublicKey
if signKey, err = peer.ExchangeKeys(true); err != nil {
log.Warn().Err(err).Msg("could not exchange keys with remote peer")
return status.Errorf(codes.FailedPrecondition, "could not exchange keys with remote peer: %s", err)
if signKey, err = s.fetchSigningKey(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch signing key from benficiary peer")
return status.Errorf(codes.FailedPrecondition, "could not fetch signing key from beneficiary peer: %s", err)
}

if err = s.db.Create(xfer).Error; err != nil {
Expand Down Expand Up @@ -387,9 +387,10 @@ func (s *Server) sendTransfer(xfer *db.Transaction, beneficiary *db.Wallet, part

// Parse the response payload
var pending *generic.Pending
if identity, transaction, pending, err = parsePayload(payload, true); err != nil {
log.Warn().Err(err).Msg("TRISA protocol error while parsing payload")
return status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", err)
var parseError *protocol.Error
if identity, transaction, pending, parseError = parsePayload(payload, true); parseError != nil {
log.Warn().Str("message", parseError.Message).Msg("TRISA protocol error while parsing payload")
return status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", parseError.Message)
}

// Update the transaction record with the identity payload
Expand Down Expand Up @@ -462,11 +463,11 @@ func (s *Server) sendTransfer(xfer *db.Transaction, beneficiary *db.Wallet, part

// sendError sends a TRISA error to the beneficiary.
func (s *Server) sendError(xfer *db.Transaction, beneficiary *db.Wallet) (err error) {
// Conduct a GDS lookup if necessary to get the endpoint
// Fetch the remote peer
var peer *peers.Peer
if peer, err = s.peers.Search(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not search peer from directory service")
return status.Errorf(codes.FailedPrecondition, "could not search peer from directory service: %s", err)
if peer, err = s.fetchPeer(beneficiary.Provider.Name); err != nil {
log.Warn().Err(err).Msg("could not fetch beneficiary peer")
return status.Errorf(codes.FailedPrecondition, "could not fetch beneficiary peer: %s", err)
}

reject := protocol.Errorf(protocol.ComplianceCheckFail, "rVASP mock compliance check failed")
Expand Down Expand Up @@ -494,7 +495,7 @@ func (s *Server) sendError(xfer *db.Transaction, beneficiary *db.Wallet) (err er

// respondAsync responds to a serviced transfer request from the beneficiary by
// continuing or completing the asynchronous handshake.
func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, identity *ivms101.IdentityPayload, transaction *generic.Transaction, xfer *db.Transaction) (out *protocol.SecureEnvelope, err error) {
func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, identity *ivms101.IdentityPayload, transaction *generic.Transaction, xfer *db.Transaction) (out *protocol.SecureEnvelope, transferError *protocol.Error) {
// Secure envelope was successfully received
now := time.Now()

Expand All @@ -504,46 +505,54 @@ func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, ident
return nil, protocol.Errorf(protocol.ComplianceCheckFail, "received expired transaction")
}

// Fetch the signing key from the remote peer
var signKey *rsa.PublicKey
var err error
if signKey, err = s.fetchSigningKey(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch signing key from beneficiary peer")
return nil, protocol.Errorf(protocol.NoSigningKey, "could not fetch signing key from beneficiary peer: %s", err)
}

// Marshal the identity payload into the transaction record
var data []byte
if data, err = protojson.Marshal(identity); err != nil {
log.Error().Err(err).Msg("could not marshal identity payload")
return nil, status.Errorf(codes.Internal, "could not marshal identity payload: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not marshal identity payload: %s", err)
}
xfer.Identity = string(data)

// Marshal the transaction payload into the transaction record
if data, err = protojson.Marshal(transaction); err != nil {
log.Error().Err(err).Msg("could not marshal transaction payload")
return nil, status.Errorf(codes.Internal, "could not marshal transaction payload: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not marshal transaction payload: %s", err)
}
xfer.Transaction = string(data)

// Fetch the beneficiary identity
if err = s.db.LookupIdentity(transaction.Beneficiary).First(&xfer.Beneficiary).Error; err != nil {
log.Error().Err(err).Str("wallet_address", transaction.Beneficiary).Msg("could not lookup beneficiary identity")
return nil, status.Errorf(codes.Internal, "could not lookup beneficiary identity: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not lookup beneficiary identity: %s", err)
}

// Save the peer name so we can access it later
xfer.Beneficiary.Provider = peer.String()
if err = s.db.Save(&xfer.Beneficiary).Error; err != nil {
log.Error().Err(err).Msg("could not save beneficiary identity")
return nil, status.Errorf(codes.Internal, "could not save beneficiary identity: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not save beneficiary identity: %s", err)
}

// Create the response envelope
out, reject, err := envelope.Seal(payload, envelope.WithRSAPublicKey(peer.SigningKey()))
out, reject, err := envelope.Seal(payload, envelope.WithRSAPublicKey(signKey))
if err != nil {
if reject != nil {
if out, err = envelope.Reject(reject); err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", err)
return nil, protocol.Errorf(protocol.EnvelopeDecodeFail, "TRISA protocol error: %s", err)
}
xfer.SetState(pb.TransactionState_REJECTED)
return out, nil
}
log.Warn().Err(err).Msg("TRISA protocol error while sealing envelope")
return nil, status.Errorf(codes.FailedPrecondition, "TRISA protocol error: %s", err)
return nil, protocol.Errorf(protocol.EnvelopeDecodeFail, "TRISA protocol error: %s", err)
}

// Check the transaction state
Expand All @@ -566,7 +575,7 @@ func (s *Server) respondAsync(peer *peers.Peer, payload *protocol.Payload, ident
account.Balance = account.Balance.Sub(xfer.Amount)
if err = s.db.Save(&account).Error; err != nil {
log.Error().Err(err).Msg("could not save originator account")
return nil, status.Errorf(codes.Internal, "could not save originator account: %s", err)
return nil, protocol.Errorf(protocol.InternalError, "could not save originator account: %s", err)
}
xfer.SetState(pb.TransactionState_COMPLETED)
default:
Expand Down Expand Up @@ -601,6 +610,20 @@ func (s *Server) continueAsync(xfer *db.Transaction) (err error) {
return fmt.Errorf("could not fetch beneficiary address")
}

// Fetch the remote peer
var peer *peers.Peer
if peer, err = s.fetchPeer(beneficiary.Provider); err != nil {
log.Error().Err(err).Msg("could not fetch beneficiary peer")
return fmt.Errorf("could not fetch beneficiary peer: %s", err)
}

// Fetch the signing key from the remote peer
var signKey *rsa.PublicKey
if signKey, err = s.fetchSigningKey(peer); err != nil {
log.Warn().Err(err).Msg("could not fetch signing key from beneficiary peer")
return fmt.Errorf("could not fetch signing key from beneficiary peer: %s", err)
}

// Fill the transaction with a new TxID to continue the handshake
var payload *protocol.Payload
transaction.Txid = uuid.New().String()
Expand All @@ -609,20 +632,6 @@ func (s *Server) continueAsync(xfer *db.Transaction) (err error) {
return fmt.Errorf("could not create transfer payload: %s", err)
}

// Conduct a GDS lookup if necessary to get the endpoint
var peer *peers.Peer
if peer, err = s.peers.Search(beneficiary.Provider); err != nil {
log.Error().Err(err).Msg("could not search peer from directory service")
return fmt.Errorf("could not search peer from directory service: %s", err)
}

// Ensure that the local RVASP has signing keys for the remote, otherwise perform key exchange
var signKey *rsa.PublicKey
if signKey, err = peer.ExchangeKeys(true); err != nil {
log.Warn().Err(err).Msg("could not exchange keys with remote peer")
return fmt.Errorf("could not exchange keys with remote peer: %s", err)
}

// Secure the envelope with the remote beneficiary's signing keys
msg, _, err := envelope.Seal(payload, envelope.WithEnvelopeID(xfer.Envelope), envelope.WithRSAPublicKey(signKey))
if err != nil {
Expand All @@ -645,9 +654,10 @@ func (s *Server) continueAsync(xfer *db.Transaction) (err error) {

// Parse the response payload
var pending *generic.Pending
if identity, _, pending, err = parsePayload(payload, true); err != nil {
log.Warn().Err(err).Msg("TRISA protocol error while parsing payload")
return fmt.Errorf("TRISA protocol error: %s", err)
var parseError *protocol.Error
if identity, _, pending, parseError = parsePayload(payload, true); parseError != nil {
log.Warn().Str("message", parseError.Message).Msg("TRISA protocol error while parsing payload")
return fmt.Errorf("TRISA protocol error: %s", parseError.Message)
}

if pending == nil {
Expand Down
19 changes: 10 additions & 9 deletions pkg/rvasp/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,43 +133,44 @@ func createPendingPayload(pending *generic.Pending, identity *ivms101.IdentityPa

// Parse a TRISA transfer payload, returning the identity payload and either the
// transaction payload or the pending message.
func parsePayload(payload *protocol.Payload, response bool) (identity *ivms101.IdentityPayload, transaction *generic.Transaction, pending *generic.Pending, err error) {
func parsePayload(payload *protocol.Payload, response bool) (identity *ivms101.IdentityPayload, transaction *generic.Transaction, pending *generic.Pending, parseError *protocol.Error) {
// Verify the sent_at timestamp if this is an accepted response payload
if payload.SentAt == "" {
log.Warn().Msg("missing sent at timestamp")
return nil, nil, nil, fmt.Errorf("missing sent_at timestamp")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing sent_at timestamp")
}

// Payload must contain an identity
if payload.Identity == nil {
log.Warn().Msg("payload does not contain an identity")
return nil, nil, nil, fmt.Errorf("missing identity payload")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing identity payload")
}

// Payload must contain a transaction
if payload.Transaction == nil {
log.Warn().Msg("payload does not contain a transaction")
return nil, nil, nil, fmt.Errorf("missing transaction payload")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing transaction payload")
}

// Parse the identity payload
identity = &ivms101.IdentityPayload{}
var err error
if err = payload.Identity.UnmarshalTo(identity); err != nil {
log.Warn().Err(err).Msg("could not unmarshal identity")
return nil, nil, nil, fmt.Errorf("could non unmarshal identity: %s", err)
return nil, nil, nil, protocol.Errorf(protocol.UnparseableIdentity, "could non unmarshal identity: %s", err)
}

// Validate identity fields
if identity.Originator == nil || identity.OriginatingVasp == nil || identity.BeneficiaryVasp == nil || identity.Beneficiary == nil {
log.Warn().Msg("incomplete identity payload")
return nil, nil, nil, fmt.Errorf("incomplete identity payload")
return nil, nil, nil, protocol.Errorf(protocol.IncompleteIdentity, "incomplete identity payload")
}

// Parse the transaction message type
var msgTx proto.Message
if msgTx, err = payload.Transaction.UnmarshalNew(); err != nil {
log.Warn().Err(err).Str("transaction_type", payload.Transaction.TypeUrl).Msg("could not unmarshal incoming transaction payload")
return nil, nil, nil, status.Errorf(codes.FailedPrecondition, "could not unmarshal transaction payload: %s", err)
return nil, nil, nil, protocol.Errorf(protocol.UnparseableTransaction, "could not unmarshal transaction payload: %s", err)
}

switch tx := msgTx.(type) {
Expand All @@ -180,13 +181,13 @@ func parsePayload(payload *protocol.Payload, response bool) (identity *ivms101.I
// NOTE: pending messages and intermediate messages will not contain received_at
if response && payload.ReceivedAt == "" {
log.Warn().Msg("missing received at timestamp")
return nil, nil, nil, fmt.Errorf("missing received_at timestamp")
return nil, nil, nil, protocol.Errorf(protocol.MissingFields, "missing received_at timestamp")
}
case *generic.Pending:
pending = tx
default:
log.Warn().Str("transaction_type", payload.Transaction.TypeUrl).Msg("could not handle incoming transaction payload")
return nil, nil, nil, fmt.Errorf("unexpected transaction payload type: %s", payload.Transaction.TypeUrl)
return nil, nil, nil, protocol.Errorf(protocol.UnparseableTransaction, "unexpected transaction payload type: %s", payload.Transaction.TypeUrl)
}
return identity, transaction, pending, nil
}
Loading