diff --git a/cmd/rvasp/main.go b/cmd/rvasp/main.go index 1162703..a41f985 100644 --- a/cmd/rvasp/main.go +++ b/cmd/rvasp/main.go @@ -255,6 +255,10 @@ func initdb(c *cli.Context) (err error) { conf.Database.DSN = dsn } + if conf.Database.DSN == "" { + return cli.NewExitError("rvasp database dsn required", 1) + } + if fixtures := c.String("fixtures"); fixtures != "" { conf.FixturesPath = fixtures } diff --git a/containers/docker-compose.yml b/containers/docker-compose.yml index dc382f0..826bd13 100644 --- a/containers/docker-compose.yml +++ b/containers/docker-compose.yml @@ -66,6 +66,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER @@ -88,6 +90,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER @@ -110,6 +114,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER @@ -132,6 +138,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER diff --git a/pkg/rvasp/async.go b/pkg/rvasp/async.go index 343c69a..4ff5da7 100644 --- a/pkg/rvasp/async.go +++ b/pkg/rvasp/async.go @@ -1,9 +1,12 @@ package rvasp import ( + "context" "fmt" + "runtime/debug" "time" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/trisacrypto/testnet/pkg/rvasp/db" pb "github.com/trisacrypto/testnet/pkg/rvasp/pb/v1" @@ -13,8 +16,16 @@ import ( // rVASP database and initiates TRISA transfers back to the originator. This allows the // rVASPs to simulate asynchronous transactions. func (s *TRISA) AsyncHandler(stop <-chan struct{}) { - ticker := time.NewTicker(s.parent.conf.AsyncInterval) + var ( + ctx context.Context + cancel context.CancelFunc + ) + + // Create default context and cancel to stop async handler routine before a handler is fired. + _, cancel = context.WithCancel(context.Background()) + // Create the ticker to run aync handlers periodically + ticker := time.NewTicker(s.parent.conf.AsyncInterval) log.Info().Dur("interval", s.parent.conf.AsyncInterval).Msg("asynchronous handler started") for { @@ -22,49 +33,67 @@ func (s *TRISA) AsyncHandler(stop <-chan struct{}) { select { case <-stop: log.Info().Msg("asynchronous handler received stop signal") + cancel() return case <-ticker.C: + cancel() } - log.Info().Msg("checking for pending transactions") + // Execute the handle async go routine with cancellation signals + log.Debug().Msg("checking for pending transactions to handle async") + ctx, cancel = context.WithTimeout(context.Background(), s.parent.conf.AsyncInterval) + go s.handleAsync(ctx) + } +} - // Retrieve all pending messages from the database - var ( - transactions []db.Transaction - err error - ) - if err = s.parent.db.LookupPending().Find(&transactions).Error; err != nil { - log.Error().Err(err).Msg("could not lookup transactions") - continue +func (s *TRISA) handleAsync(ctx context.Context) { + // Recover from panics that occur during async handling. + defer func() { + if r := recover(); r != nil { + log.WithLevel(zerolog.PanicLevel). + Err(fmt.Errorf("%v", r)). + Str("stack_trace", string(debug.Stack())). + Msg("handle async has recovered from a panic") } + }() - now := time.Now() - for _, tx := range transactions { - // Verify pending transaction is old enough - if now.Before(tx.NotBefore) { - continue - } - - // Verify pending transaction has not expired - if now.After(tx.NotAfter) { - log.Info().Uint("id", tx.ID).Time("not_after", tx.NotAfter).Msg("transaction expired") - tx.State = pb.TransactionState_EXPIRED - if err = s.parent.db.Save(&tx).Error; err != nil { - log.Error().Err(err).Uint("id", tx.ID).Msg("could not save expired transaction") - } - continue - } + // Retrieve all pending messages from the database + var ( + transactions []db.Transaction + err error + ) + if err = s.parent.db.LookupPending().Find(&transactions).Error; err != nil { + log.Error().Err(err).Msg("could not lookup transactions") + return + } - // Acknowledge the transaction with the originator - if err = s.acknowledgeTransaction(&tx); err != nil { - log.Error().Err(err).Uint("id", tx.ID).Msg("could not acknowledge transaction") - tx.State = pb.TransactionState_FAILED - } + now := time.Now() +txloop: + for _, tx := range transactions { + // Verify pending transaction is old enough + if now.Before(tx.NotBefore) { + continue + } - // Save the updated transaction in the database + // Verify pending transaction has not expired + if now.After(tx.NotAfter) { + log.Info().Uint("id", tx.ID).Time("not_after", tx.NotAfter).Msg("transaction expired") + tx.State = pb.TransactionState_EXPIRED if err = s.parent.db.Save(&tx).Error; err != nil { - log.Error().Err(err).Uint("id", tx.ID).Msg("could not save completed transaction") + log.Error().Err(err).Uint("id", tx.ID).Msg("could not save expired transaction") } + continue txloop + } + + // Acknowledge the transaction with the originator + if err = s.acknowledgeTransaction(&tx); err != nil { + log.Error().Err(err).Uint("id", tx.ID).Msg("could not acknowledge transaction") + tx.State = pb.TransactionState_FAILED + } + + // Save the updated transaction in the database + if err = s.parent.db.Save(&tx).Error; err != nil { + log.Error().Err(err).Uint("id", tx.ID).Msg("could not save completed transaction") } } } diff --git a/pkg/rvasp/interceptor.go b/pkg/rvasp/interceptor.go new file mode 100644 index 0000000..64f9d97 --- /dev/null +++ b/pkg/rvasp/interceptor.go @@ -0,0 +1,72 @@ +package rvasp + +import ( + "context" + "fmt" + "runtime/debug" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// This unary interceptor traces gRPC requests, adds zerolog logging and panic recovery. +func UnaryTraceInterceptor(ctx context.Context, in interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (out interface{}, err error) { + // Track how long the method takes to execute. + start := time.Now() + panicked := true + + // Recover from panics in the handler. + defer func() { + if r := recover(); r != nil || panicked { + log.WithLevel(zerolog.PanicLevel). + Err(fmt.Errorf("%v", r)). + Str("stack_trace", string(debug.Stack())). + Msg("grpc server has recovered from a panic") + err = status.Error(codes.Internal, "an unhandled exception occurred") + } + }() + + // Call the handler to finalize the request and get the response. + out, err = handler(ctx, in) + panicked = false + + // Log with zerolog - checkout grpclog.LoggerV2 for default logging. + log.Debug(). + Err(err). + Str("method", info.FullMethod). + Str("latency", time.Since(start).String()). + Msg("gRPC request complete") + return out, err +} + +// This streaming interceptor traces gRPC requests, adds zerolog logging and panic recovery. +func StreamTraceInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + // Track how long the method takes to execute. + start := time.Now() + panicked := true + + defer func() { + if r := recover(); r != nil || panicked { + log.WithLevel(zerolog.PanicLevel). + Err(fmt.Errorf("%v", r)). + Str("stack_trace", string(debug.Stack())). + Msg("grpc server has recovered from a panic") + err = status.Error(codes.Internal, "an unhandled exception occurred") + } + }() + + err = handler(srv, stream) + panicked = false + + // Log with zerolog - checkout grpclog.LoggerV2 for default logging. + log.Debug(). + Err(err). + Str("method", info.FullMethod). + Str("duration", time.Since(start).String()). + Msg("gRPC stream closed") + return err +} diff --git a/pkg/rvasp/rvasp.go b/pkg/rvasp/rvasp.go index 140c6ff..d9d7bfe 100644 --- a/pkg/rvasp/rvasp.go +++ b/pkg/rvasp/rvasp.go @@ -106,8 +106,8 @@ type Server struct { // Serve GRPC requests on the specified address. func (s *Server) Serve() (err error) { - // Initialize the gRPC server - s.srv = grpc.NewServer() + // Initialize the gRPC server with panic recovery and tracing + s.srv = grpc.NewServer(grpc.UnaryInterceptor(UnaryTraceInterceptor), grpc.StreamInterceptor(StreamTraceInterceptor)) pb.RegisterTRISADemoServer(s.srv, s) pb.RegisterTRISAIntegrationServer(s.srv, s) diff --git a/pkg/rvasp/trisa.go b/pkg/rvasp/trisa.go index a90f337..81ae8ec 100644 --- a/pkg/rvasp/trisa.go +++ b/pkg/rvasp/trisa.go @@ -88,7 +88,8 @@ func (s *TRISA) Serve() (err error) { return err } - s.srv = grpc.NewServer(creds) + // Create a new gRPC server with panic recovery and tracing middleware + s.srv = grpc.NewServer(creds, grpc.UnaryInterceptor(UnaryTraceInterceptor), grpc.StreamInterceptor(StreamTraceInterceptor)) protocol.RegisterTRISANetworkServer(s.srv, s) protocol.RegisterTRISAHealthServer(s.srv, s) @@ -142,12 +143,21 @@ func (s *TRISA) Transfer(ctx context.Context, in *protocol.SecureEnvelope) (out // Check signing key is available to send an encrypted response if peer.SigningKey() == nil { - log.Warn().Str("peer", peer.String()).Msg("no remote signing key available") - s.parent.updates.Broadcast(0, "no remote signing key available, key exchange required", pb.MessageCategory_TRISAP2P) - return nil, &protocol.Error{ - Code: protocol.NoSigningKey, - Message: "please retry transfer after key exchange", - Retry: true, + log.Warn().Str("peer", peer.String()).Msg("no remote signing key available, attempting key exchange") + s.parent.updates.Broadcast(0, "no remote signing key available, attempting key exchange", pb.MessageCategory_TRISAP2P) + + if _, err = peer.ExchangeKeys(false); err != nil { + log.Warn().Err(err).Str("peer", peer.String()).Msg("no remote signing key available, key exchange failed") + s.parent.updates.Broadcast(0, fmt.Sprintf("key exchange failed: %s", err), pb.MessageCategory_TRISAP2P) + } + + // Second check for signing keys, if they're not available then reject messages + if peer.SigningKey() == nil { + return nil, &protocol.Error{ + Code: protocol.NoSigningKey, + Message: "please retry transfer after key exchange", + Retry: true, + } } } @@ -171,12 +181,21 @@ func (s *TRISA) TransferStream(stream protocol.TRISANetwork_TransferStreamServer // Check signing key is available to send an encrypted response if peer.SigningKey() == nil { - log.Warn().Str("peer", peer.String()).Msg("no remote signing key available") - s.parent.updates.Broadcast(0, "no remote signing key available, key exchange required", pb.MessageCategory_TRISAP2P) - return &protocol.Error{ - Code: protocol.NoSigningKey, - Message: "please retry transfer after key exchange", - Retry: true, + log.Warn().Str("peer", peer.String()).Msg("no remote signing key available, attempting key exchange") + s.parent.updates.Broadcast(0, "no remote signing key available, attempting key exchange", pb.MessageCategory_TRISAP2P) + + if _, err = peer.ExchangeKeys(false); err != nil { + log.Warn().Err(err).Str("peer", peer.String()).Msg("no remote signing key available, key exchange failed") + s.parent.updates.Broadcast(0, fmt.Sprintf("key exchange failed: %s", err), pb.MessageCategory_TRISAP2P) + } + + // Second check for signing keys, if they're not available then reject messages + if peer.SigningKey() == nil { + return &protocol.Error{ + Code: protocol.NoSigningKey, + Message: "please retry transfer after key exchange", + Retry: true, + } } } @@ -633,6 +652,15 @@ func (s *TRISA) sendAsync(tx *db.Transaction) (err error) { return fmt.Errorf("TRISA protocol error while parsing payload: %s", err) } + if transaction == nil { + // We expected an echo from the counterparty to conclude an async but got back + // a pending or other type of correctly parsed response. + log.Warn(). + Str("transaction_type", payload.Transaction.TypeUrl). + Msg("unexpected transaction reply to async completion") + return fmt.Errorf("received %q payload expected a generic Transaction echo", payload.Transaction.TypeUrl) + } + switch tx.State { case pb.TransactionState_PENDING_SENT: // The first handshake is complete so move the transaction to the next state diff --git a/web/placeholder/vaspbot/favicon.ico b/web/placeholder/vaspbot/favicon.ico new file mode 100644 index 0000000..447fa4b Binary files /dev/null and b/web/placeholder/vaspbot/favicon.ico differ