Skip to content

Commit

Permalink
Panic Recovery Interceptors (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbengfort authored Jun 15, 2022
1 parent 438c810 commit 300168f
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 48 deletions.
4 changes: 4 additions & 0 deletions cmd/rvasp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ func initdb(c *cli.Context) (err error) {
conf.Database.DSN = dsn
}

if conf.Database.DSN == "" {
return cli.NewExitError("rvasp database dsn required", 1)
}

if fixtures := c.String("fixtures"); fixtures != "" {
conf.FixturesPath = fixtures
}
Expand Down
8 changes: 8 additions & 0 deletions containers/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ services:
- RVASP_GDS_URL=gds:4433
- RVASP_GDS_INSECURE=true
- RVASP_DATABASE_MAX_RETRIES=5
- RVASP_LOG_LEVEL
- RVASP_CONSOLE_LOG=true
- RVASP_ASYNC_INTERVAL
- RVASP_ASYNC_NOT_BEFORE
- RVASP_ASYNC_NOT_AFTER
Expand All @@ -88,6 +90,8 @@ services:
- RVASP_GDS_URL=gds:4433
- RVASP_GDS_INSECURE=true
- RVASP_DATABASE_MAX_RETRIES=5
- RVASP_LOG_LEVEL
- RVASP_CONSOLE_LOG=true
- RVASP_ASYNC_INTERVAL
- RVASP_ASYNC_NOT_BEFORE
- RVASP_ASYNC_NOT_AFTER
Expand All @@ -110,6 +114,8 @@ services:
- RVASP_GDS_URL=gds:4433
- RVASP_GDS_INSECURE=true
- RVASP_DATABASE_MAX_RETRIES=5
- RVASP_LOG_LEVEL
- RVASP_CONSOLE_LOG=true
- RVASP_ASYNC_INTERVAL
- RVASP_ASYNC_NOT_BEFORE
- RVASP_ASYNC_NOT_AFTER
Expand All @@ -132,6 +138,8 @@ services:
- RVASP_GDS_URL=gds:4433
- RVASP_GDS_INSECURE=true
- RVASP_DATABASE_MAX_RETRIES=5
- RVASP_LOG_LEVEL
- RVASP_CONSOLE_LOG=true
- RVASP_ASYNC_INTERVAL
- RVASP_ASYNC_NOT_BEFORE
- RVASP_ASYNC_NOT_AFTER
Expand Down
95 changes: 62 additions & 33 deletions pkg/rvasp/async.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package rvasp

import (
"context"
"fmt"
"runtime/debug"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/trisacrypto/testnet/pkg/rvasp/db"
pb "github.com/trisacrypto/testnet/pkg/rvasp/pb/v1"
Expand All @@ -13,58 +16,84 @@ import (
// rVASP database and initiates TRISA transfers back to the originator. This allows the
// rVASPs to simulate asynchronous transactions.
func (s *TRISA) AsyncHandler(stop <-chan struct{}) {
ticker := time.NewTicker(s.parent.conf.AsyncInterval)
var (
ctx context.Context
cancel context.CancelFunc
)

// Create default context and cancel to stop async handler routine before a handler is fired.
_, cancel = context.WithCancel(context.Background())

// Create the ticker to run aync handlers periodically
ticker := time.NewTicker(s.parent.conf.AsyncInterval)
log.Info().Dur("interval", s.parent.conf.AsyncInterval).Msg("asynchronous handler started")

for {
// Wait for the next tick or the stop signal
select {
case <-stop:
log.Info().Msg("asynchronous handler received stop signal")
cancel()
return
case <-ticker.C:
cancel()
}

log.Info().Msg("checking for pending transactions")
// Execute the handle async go routine with cancellation signals
log.Debug().Msg("checking for pending transactions to handle async")
ctx, cancel = context.WithTimeout(context.Background(), s.parent.conf.AsyncInterval)
go s.handleAsync(ctx)
}
}

// Retrieve all pending messages from the database
var (
transactions []db.Transaction
err error
)
if err = s.parent.db.LookupPending().Find(&transactions).Error; err != nil {
log.Error().Err(err).Msg("could not lookup transactions")
continue
func (s *TRISA) handleAsync(ctx context.Context) {
// Recover from panics that occur during async handling.
defer func() {
if r := recover(); r != nil {
log.WithLevel(zerolog.PanicLevel).
Err(fmt.Errorf("%v", r)).
Str("stack_trace", string(debug.Stack())).
Msg("handle async has recovered from a panic")
}
}()

now := time.Now()
for _, tx := range transactions {
// Verify pending transaction is old enough
if now.Before(tx.NotBefore) {
continue
}

// Verify pending transaction has not expired
if now.After(tx.NotAfter) {
log.Info().Uint("id", tx.ID).Time("not_after", tx.NotAfter).Msg("transaction expired")
tx.State = pb.TransactionState_EXPIRED
if err = s.parent.db.Save(&tx).Error; err != nil {
log.Error().Err(err).Uint("id", tx.ID).Msg("could not save expired transaction")
}
continue
}
// Retrieve all pending messages from the database
var (
transactions []db.Transaction
err error
)
if err = s.parent.db.LookupPending().Find(&transactions).Error; err != nil {
log.Error().Err(err).Msg("could not lookup transactions")
return
}

// Acknowledge the transaction with the originator
if err = s.acknowledgeTransaction(&tx); err != nil {
log.Error().Err(err).Uint("id", tx.ID).Msg("could not acknowledge transaction")
tx.State = pb.TransactionState_FAILED
}
now := time.Now()
txloop:
for _, tx := range transactions {
// Verify pending transaction is old enough
if now.Before(tx.NotBefore) {
continue
}

// Save the updated transaction in the database
// Verify pending transaction has not expired
if now.After(tx.NotAfter) {
log.Info().Uint("id", tx.ID).Time("not_after", tx.NotAfter).Msg("transaction expired")
tx.State = pb.TransactionState_EXPIRED
if err = s.parent.db.Save(&tx).Error; err != nil {
log.Error().Err(err).Uint("id", tx.ID).Msg("could not save completed transaction")
log.Error().Err(err).Uint("id", tx.ID).Msg("could not save expired transaction")
}
continue txloop
}

// Acknowledge the transaction with the originator
if err = s.acknowledgeTransaction(&tx); err != nil {
log.Error().Err(err).Uint("id", tx.ID).Msg("could not acknowledge transaction")
tx.State = pb.TransactionState_FAILED
}

// Save the updated transaction in the database
if err = s.parent.db.Save(&tx).Error; err != nil {
log.Error().Err(err).Uint("id", tx.ID).Msg("could not save completed transaction")
}
}
}
Expand Down
72 changes: 72 additions & 0 deletions pkg/rvasp/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package rvasp

import (
"context"
"fmt"
"runtime/debug"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// This unary interceptor traces gRPC requests, adds zerolog logging and panic recovery.
func UnaryTraceInterceptor(ctx context.Context, in interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (out interface{}, err error) {
// Track how long the method takes to execute.
start := time.Now()
panicked := true

// Recover from panics in the handler.
defer func() {
if r := recover(); r != nil || panicked {
log.WithLevel(zerolog.PanicLevel).
Err(fmt.Errorf("%v", r)).
Str("stack_trace", string(debug.Stack())).
Msg("grpc server has recovered from a panic")
err = status.Error(codes.Internal, "an unhandled exception occurred")
}
}()

// Call the handler to finalize the request and get the response.
out, err = handler(ctx, in)
panicked = false

// Log with zerolog - checkout grpclog.LoggerV2 for default logging.
log.Debug().
Err(err).
Str("method", info.FullMethod).
Str("latency", time.Since(start).String()).
Msg("gRPC request complete")
return out, err
}

// This streaming interceptor traces gRPC requests, adds zerolog logging and panic recovery.
func StreamTraceInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
// Track how long the method takes to execute.
start := time.Now()
panicked := true

defer func() {
if r := recover(); r != nil || panicked {
log.WithLevel(zerolog.PanicLevel).
Err(fmt.Errorf("%v", r)).
Str("stack_trace", string(debug.Stack())).
Msg("grpc server has recovered from a panic")
err = status.Error(codes.Internal, "an unhandled exception occurred")
}
}()

err = handler(srv, stream)
panicked = false

// Log with zerolog - checkout grpclog.LoggerV2 for default logging.
log.Debug().
Err(err).
Str("method", info.FullMethod).
Str("duration", time.Since(start).String()).
Msg("gRPC stream closed")
return err
}
4 changes: 2 additions & 2 deletions pkg/rvasp/rvasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ type Server struct {

// Serve GRPC requests on the specified address.
func (s *Server) Serve() (err error) {
// Initialize the gRPC server
s.srv = grpc.NewServer()
// Initialize the gRPC server with panic recovery and tracing
s.srv = grpc.NewServer(grpc.UnaryInterceptor(UnaryTraceInterceptor), grpc.StreamInterceptor(StreamTraceInterceptor))
pb.RegisterTRISADemoServer(s.srv, s)
pb.RegisterTRISAIntegrationServer(s.srv, s)

Expand Down
54 changes: 41 additions & 13 deletions pkg/rvasp/trisa.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (s *TRISA) Serve() (err error) {
return err
}

s.srv = grpc.NewServer(creds)
// Create a new gRPC server with panic recovery and tracing middleware
s.srv = grpc.NewServer(creds, grpc.UnaryInterceptor(UnaryTraceInterceptor), grpc.StreamInterceptor(StreamTraceInterceptor))
protocol.RegisterTRISANetworkServer(s.srv, s)
protocol.RegisterTRISAHealthServer(s.srv, s)

Expand Down Expand Up @@ -142,12 +143,21 @@ func (s *TRISA) Transfer(ctx context.Context, in *protocol.SecureEnvelope) (out

// Check signing key is available to send an encrypted response
if peer.SigningKey() == nil {
log.Warn().Str("peer", peer.String()).Msg("no remote signing key available")
s.parent.updates.Broadcast(0, "no remote signing key available, key exchange required", pb.MessageCategory_TRISAP2P)
return nil, &protocol.Error{
Code: protocol.NoSigningKey,
Message: "please retry transfer after key exchange",
Retry: true,
log.Warn().Str("peer", peer.String()).Msg("no remote signing key available, attempting key exchange")
s.parent.updates.Broadcast(0, "no remote signing key available, attempting key exchange", pb.MessageCategory_TRISAP2P)

if _, err = peer.ExchangeKeys(false); err != nil {
log.Warn().Err(err).Str("peer", peer.String()).Msg("no remote signing key available, key exchange failed")
s.parent.updates.Broadcast(0, fmt.Sprintf("key exchange failed: %s", err), pb.MessageCategory_TRISAP2P)
}

// Second check for signing keys, if they're not available then reject messages
if peer.SigningKey() == nil {
return nil, &protocol.Error{
Code: protocol.NoSigningKey,
Message: "please retry transfer after key exchange",
Retry: true,
}
}
}

Expand All @@ -171,12 +181,21 @@ func (s *TRISA) TransferStream(stream protocol.TRISANetwork_TransferStreamServer

// Check signing key is available to send an encrypted response
if peer.SigningKey() == nil {
log.Warn().Str("peer", peer.String()).Msg("no remote signing key available")
s.parent.updates.Broadcast(0, "no remote signing key available, key exchange required", pb.MessageCategory_TRISAP2P)
return &protocol.Error{
Code: protocol.NoSigningKey,
Message: "please retry transfer after key exchange",
Retry: true,
log.Warn().Str("peer", peer.String()).Msg("no remote signing key available, attempting key exchange")
s.parent.updates.Broadcast(0, "no remote signing key available, attempting key exchange", pb.MessageCategory_TRISAP2P)

if _, err = peer.ExchangeKeys(false); err != nil {
log.Warn().Err(err).Str("peer", peer.String()).Msg("no remote signing key available, key exchange failed")
s.parent.updates.Broadcast(0, fmt.Sprintf("key exchange failed: %s", err), pb.MessageCategory_TRISAP2P)
}

// Second check for signing keys, if they're not available then reject messages
if peer.SigningKey() == nil {
return &protocol.Error{
Code: protocol.NoSigningKey,
Message: "please retry transfer after key exchange",
Retry: true,
}
}
}

Expand Down Expand Up @@ -633,6 +652,15 @@ func (s *TRISA) sendAsync(tx *db.Transaction) (err error) {
return fmt.Errorf("TRISA protocol error while parsing payload: %s", err)
}

if transaction == nil {
// We expected an echo from the counterparty to conclude an async but got back
// a pending or other type of correctly parsed response.
log.Warn().
Str("transaction_type", payload.Transaction.TypeUrl).
Msg("unexpected transaction reply to async completion")
return fmt.Errorf("received %q payload expected a generic Transaction echo", payload.Transaction.TypeUrl)
}

switch tx.State {
case pb.TransactionState_PENDING_SENT:
// The first handshake is complete so move the transaction to the next state
Expand Down
Binary file added web/placeholder/vaspbot/favicon.ico
Binary file not shown.

0 comments on commit 300168f

Please sign in to comment.