From a1cc68fb8a8aff1514e3c05aae6fafc24a5f3b54 Mon Sep 17 00:00:00 2001 From: Benjamin Bengfort Date: Wed, 15 Jun 2022 13:50:37 -0500 Subject: [PATCH 1/5] panic recovery interceptors --- cmd/rvasp/main.go | 4 +++ pkg/rvasp/interceptor.go | 72 ++++++++++++++++++++++++++++++++++++++++ pkg/rvasp/rvasp.go | 4 +-- pkg/rvasp/trisa.go | 3 +- 4 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 pkg/rvasp/interceptor.go diff --git a/cmd/rvasp/main.go b/cmd/rvasp/main.go index 1162703..a41f985 100644 --- a/cmd/rvasp/main.go +++ b/cmd/rvasp/main.go @@ -255,6 +255,10 @@ func initdb(c *cli.Context) (err error) { conf.Database.DSN = dsn } + if conf.Database.DSN == "" { + return cli.NewExitError("rvasp database dsn required", 1) + } + if fixtures := c.String("fixtures"); fixtures != "" { conf.FixturesPath = fixtures } diff --git a/pkg/rvasp/interceptor.go b/pkg/rvasp/interceptor.go new file mode 100644 index 0000000..64f9d97 --- /dev/null +++ b/pkg/rvasp/interceptor.go @@ -0,0 +1,72 @@ +package rvasp + +import ( + "context" + "fmt" + "runtime/debug" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// This unary interceptor traces gRPC requests, adds zerolog logging and panic recovery. +func UnaryTraceInterceptor(ctx context.Context, in interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (out interface{}, err error) { + // Track how long the method takes to execute. + start := time.Now() + panicked := true + + // Recover from panics in the handler. + defer func() { + if r := recover(); r != nil || panicked { + log.WithLevel(zerolog.PanicLevel). + Err(fmt.Errorf("%v", r)). + Str("stack_trace", string(debug.Stack())). + Msg("grpc server has recovered from a panic") + err = status.Error(codes.Internal, "an unhandled exception occurred") + } + }() + + // Call the handler to finalize the request and get the response. + out, err = handler(ctx, in) + panicked = false + + // Log with zerolog - checkout grpclog.LoggerV2 for default logging. + log.Debug(). + Err(err). + Str("method", info.FullMethod). + Str("latency", time.Since(start).String()). + Msg("gRPC request complete") + return out, err +} + +// This streaming interceptor traces gRPC requests, adds zerolog logging and panic recovery. +func StreamTraceInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + // Track how long the method takes to execute. + start := time.Now() + panicked := true + + defer func() { + if r := recover(); r != nil || panicked { + log.WithLevel(zerolog.PanicLevel). + Err(fmt.Errorf("%v", r)). + Str("stack_trace", string(debug.Stack())). + Msg("grpc server has recovered from a panic") + err = status.Error(codes.Internal, "an unhandled exception occurred") + } + }() + + err = handler(srv, stream) + panicked = false + + // Log with zerolog - checkout grpclog.LoggerV2 for default logging. + log.Debug(). + Err(err). + Str("method", info.FullMethod). + Str("duration", time.Since(start).String()). + Msg("gRPC stream closed") + return err +} diff --git a/pkg/rvasp/rvasp.go b/pkg/rvasp/rvasp.go index 140c6ff..d9d7bfe 100644 --- a/pkg/rvasp/rvasp.go +++ b/pkg/rvasp/rvasp.go @@ -106,8 +106,8 @@ type Server struct { // Serve GRPC requests on the specified address. func (s *Server) Serve() (err error) { - // Initialize the gRPC server - s.srv = grpc.NewServer() + // Initialize the gRPC server with panic recovery and tracing + s.srv = grpc.NewServer(grpc.UnaryInterceptor(UnaryTraceInterceptor), grpc.StreamInterceptor(StreamTraceInterceptor)) pb.RegisterTRISADemoServer(s.srv, s) pb.RegisterTRISAIntegrationServer(s.srv, s) diff --git a/pkg/rvasp/trisa.go b/pkg/rvasp/trisa.go index a90f337..8b77b74 100644 --- a/pkg/rvasp/trisa.go +++ b/pkg/rvasp/trisa.go @@ -88,7 +88,8 @@ func (s *TRISA) Serve() (err error) { return err } - s.srv = grpc.NewServer(creds) + // Create a new gRPC server with panic recovery and tracing middleware + s.srv = grpc.NewServer(creds, grpc.UnaryInterceptor(UnaryTraceInterceptor), grpc.StreamInterceptor(StreamTraceInterceptor)) protocol.RegisterTRISANetworkServer(s.srv, s) protocol.RegisterTRISAHealthServer(s.srv, s) From df3543eec76624b9836d56a2cd384d09abe0b28f Mon Sep 17 00:00:00 2001 From: Benjamin Bengfort Date: Wed, 15 Jun 2022 14:17:35 -0500 Subject: [PATCH 2/5] async handler panic recovery --- pkg/rvasp/async.go | 95 ++++++++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 33 deletions(-) diff --git a/pkg/rvasp/async.go b/pkg/rvasp/async.go index 343c69a..5982867 100644 --- a/pkg/rvasp/async.go +++ b/pkg/rvasp/async.go @@ -1,9 +1,12 @@ package rvasp import ( + "context" "fmt" + "runtime/debug" "time" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/trisacrypto/testnet/pkg/rvasp/db" pb "github.com/trisacrypto/testnet/pkg/rvasp/pb/v1" @@ -13,8 +16,16 @@ import ( // rVASP database and initiates TRISA transfers back to the originator. This allows the // rVASPs to simulate asynchronous transactions. func (s *TRISA) AsyncHandler(stop <-chan struct{}) { - ticker := time.NewTicker(s.parent.conf.AsyncInterval) + var ( + ctx context.Context + cancel context.CancelFunc + ) + + // Create default context and cancel to stop async handler routine before a handler is fired. + _, cancel = context.WithCancel(context.Background()) + // Create the ticker to run aync handlers periodically + ticker := time.NewTicker(s.parent.conf.AsyncInterval) log.Info().Dur("interval", s.parent.conf.AsyncInterval).Msg("asynchronous handler started") for { @@ -22,49 +33,67 @@ func (s *TRISA) AsyncHandler(stop <-chan struct{}) { select { case <-stop: log.Info().Msg("asynchronous handler received stop signal") + cancel() return case <-ticker.C: + cancel() } - log.Info().Msg("checking for pending transactions") + // Execute the handle async go routine with cancellation signals + log.Debug().Msg("checking for pending transactions and handling them") + ctx, cancel = context.WithTimeout(context.Background(), s.parent.conf.AsyncInterval) + go s.handleAsync(ctx) + } +} - // Retrieve all pending messages from the database - var ( - transactions []db.Transaction - err error - ) - if err = s.parent.db.LookupPending().Find(&transactions).Error; err != nil { - log.Error().Err(err).Msg("could not lookup transactions") - continue +func (s *TRISA) handleAsync(ctx context.Context) { + // Recover from panics that occur during async handling. + defer func() { + if r := recover(); r != nil { + log.WithLevel(zerolog.PanicLevel). + Err(fmt.Errorf("%v", r)). + Str("stack_trace", string(debug.Stack())). + Msg("handle async has recovered from a panic") } + }() - now := time.Now() - for _, tx := range transactions { - // Verify pending transaction is old enough - if now.Before(tx.NotBefore) { - continue - } - - // Verify pending transaction has not expired - if now.After(tx.NotAfter) { - log.Info().Uint("id", tx.ID).Time("not_after", tx.NotAfter).Msg("transaction expired") - tx.State = pb.TransactionState_EXPIRED - if err = s.parent.db.Save(&tx).Error; err != nil { - log.Error().Err(err).Uint("id", tx.ID).Msg("could not save expired transaction") - } - continue - } + // Retrieve all pending messages from the database + var ( + transactions []db.Transaction + err error + ) + if err = s.parent.db.LookupPending().Find(&transactions).Error; err != nil { + log.Error().Err(err).Msg("could not lookup transactions") + return + } - // Acknowledge the transaction with the originator - if err = s.acknowledgeTransaction(&tx); err != nil { - log.Error().Err(err).Uint("id", tx.ID).Msg("could not acknowledge transaction") - tx.State = pb.TransactionState_FAILED - } + now := time.Now() +txloop: + for _, tx := range transactions { + // Verify pending transaction is old enough + if now.Before(tx.NotBefore) { + continue + } - // Save the updated transaction in the database + // Verify pending transaction has not expired + if now.After(tx.NotAfter) { + log.Info().Uint("id", tx.ID).Time("not_after", tx.NotAfter).Msg("transaction expired") + tx.State = pb.TransactionState_EXPIRED if err = s.parent.db.Save(&tx).Error; err != nil { - log.Error().Err(err).Uint("id", tx.ID).Msg("could not save completed transaction") + log.Error().Err(err).Uint("id", tx.ID).Msg("could not save expired transaction") } + continue txloop + } + + // Acknowledge the transaction with the originator + if err = s.acknowledgeTransaction(&tx); err != nil { + log.Error().Err(err).Uint("id", tx.ID).Msg("could not acknowledge transaction") + tx.State = pb.TransactionState_FAILED + } + + // Save the updated transaction in the database + if err = s.parent.db.Save(&tx).Error; err != nil { + log.Error().Err(err).Uint("id", tx.ID).Msg("could not save completed transaction") } } } From 9018fea033d9d81521b20e3933cd299d256b3191 Mon Sep 17 00:00:00 2001 From: Benjamin Bengfort Date: Wed, 15 Jun 2022 14:39:19 -0500 Subject: [PATCH 3/5] guess at fix --- pkg/rvasp/trisa.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/rvasp/trisa.go b/pkg/rvasp/trisa.go index 8b77b74..0378ea5 100644 --- a/pkg/rvasp/trisa.go +++ b/pkg/rvasp/trisa.go @@ -634,6 +634,15 @@ func (s *TRISA) sendAsync(tx *db.Transaction) (err error) { return fmt.Errorf("TRISA protocol error while parsing payload: %s", err) } + if transaction == nil { + // We expected an echo from the counterparty to conclude an async but got back + // a pending or other type of correctly parsed response. + log.Warn(). + Str("transaction_type", payload.Transaction.TypeUrl). + Msg("unexpected transaction reply to async completion") + return fmt.Errorf("received %q payload expected a generic Transaction echo", payload.Transaction.TypeUrl) + } + switch tx.State { case pb.TransactionState_PENDING_SENT: // The first handshake is complete so move the transaction to the next state From df0a21a924ca7ead599e610539526d713d0429c5 Mon Sep 17 00:00:00 2001 From: Benjamin Bengfort Date: Wed, 15 Jun 2022 15:25:15 -0500 Subject: [PATCH 4/5] async handler panic recovery again --- containers/docker-compose.yml | 8 +++++++ pkg/rvasp/async.go | 2 +- pkg/rvasp/trisa.go | 42 +++++++++++++++++++++++++---------- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/containers/docker-compose.yml b/containers/docker-compose.yml index dc382f0..826bd13 100644 --- a/containers/docker-compose.yml +++ b/containers/docker-compose.yml @@ -66,6 +66,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER @@ -88,6 +90,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER @@ -110,6 +114,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER @@ -132,6 +138,8 @@ services: - RVASP_GDS_URL=gds:4433 - RVASP_GDS_INSECURE=true - RVASP_DATABASE_MAX_RETRIES=5 + - RVASP_LOG_LEVEL + - RVASP_CONSOLE_LOG=true - RVASP_ASYNC_INTERVAL - RVASP_ASYNC_NOT_BEFORE - RVASP_ASYNC_NOT_AFTER diff --git a/pkg/rvasp/async.go b/pkg/rvasp/async.go index 5982867..4ff5da7 100644 --- a/pkg/rvasp/async.go +++ b/pkg/rvasp/async.go @@ -40,7 +40,7 @@ func (s *TRISA) AsyncHandler(stop <-chan struct{}) { } // Execute the handle async go routine with cancellation signals - log.Debug().Msg("checking for pending transactions and handling them") + log.Debug().Msg("checking for pending transactions to handle async") ctx, cancel = context.WithTimeout(context.Background(), s.parent.conf.AsyncInterval) go s.handleAsync(ctx) } diff --git a/pkg/rvasp/trisa.go b/pkg/rvasp/trisa.go index 0378ea5..81ae8ec 100644 --- a/pkg/rvasp/trisa.go +++ b/pkg/rvasp/trisa.go @@ -143,12 +143,21 @@ func (s *TRISA) Transfer(ctx context.Context, in *protocol.SecureEnvelope) (out // Check signing key is available to send an encrypted response if peer.SigningKey() == nil { - log.Warn().Str("peer", peer.String()).Msg("no remote signing key available") - s.parent.updates.Broadcast(0, "no remote signing key available, key exchange required", pb.MessageCategory_TRISAP2P) - return nil, &protocol.Error{ - Code: protocol.NoSigningKey, - Message: "please retry transfer after key exchange", - Retry: true, + log.Warn().Str("peer", peer.String()).Msg("no remote signing key available, attempting key exchange") + s.parent.updates.Broadcast(0, "no remote signing key available, attempting key exchange", pb.MessageCategory_TRISAP2P) + + if _, err = peer.ExchangeKeys(false); err != nil { + log.Warn().Err(err).Str("peer", peer.String()).Msg("no remote signing key available, key exchange failed") + s.parent.updates.Broadcast(0, fmt.Sprintf("key exchange failed: %s", err), pb.MessageCategory_TRISAP2P) + } + + // Second check for signing keys, if they're not available then reject messages + if peer.SigningKey() == nil { + return nil, &protocol.Error{ + Code: protocol.NoSigningKey, + Message: "please retry transfer after key exchange", + Retry: true, + } } } @@ -172,12 +181,21 @@ func (s *TRISA) TransferStream(stream protocol.TRISANetwork_TransferStreamServer // Check signing key is available to send an encrypted response if peer.SigningKey() == nil { - log.Warn().Str("peer", peer.String()).Msg("no remote signing key available") - s.parent.updates.Broadcast(0, "no remote signing key available, key exchange required", pb.MessageCategory_TRISAP2P) - return &protocol.Error{ - Code: protocol.NoSigningKey, - Message: "please retry transfer after key exchange", - Retry: true, + log.Warn().Str("peer", peer.String()).Msg("no remote signing key available, attempting key exchange") + s.parent.updates.Broadcast(0, "no remote signing key available, attempting key exchange", pb.MessageCategory_TRISAP2P) + + if _, err = peer.ExchangeKeys(false); err != nil { + log.Warn().Err(err).Str("peer", peer.String()).Msg("no remote signing key available, key exchange failed") + s.parent.updates.Broadcast(0, fmt.Sprintf("key exchange failed: %s", err), pb.MessageCategory_TRISAP2P) + } + + // Second check for signing keys, if they're not available then reject messages + if peer.SigningKey() == nil { + return &protocol.Error{ + Code: protocol.NoSigningKey, + Message: "please retry transfer after key exchange", + Retry: true, + } } } From daf0302eae9b1b010476e6fdba484c37d80b2849 Mon Sep 17 00:00:00 2001 From: Benjamin Bengfort Date: Wed, 15 Jun 2022 15:28:45 -0500 Subject: [PATCH 5/5] add faviocon to placeholder --- web/placeholder/vaspbot/favicon.ico | Bin 0 -> 15086 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 web/placeholder/vaspbot/favicon.ico diff --git a/web/placeholder/vaspbot/favicon.ico b/web/placeholder/vaspbot/favicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..447fa4b0e2b93092bdbdd6462f01c3d1fcbe66e9 GIT binary patch literal 15086 zcmeI2d2Ccg9LHZRhagh0qH=^55k&C-3kaevM8pFXL5!lJAS#F_hlvNcF`xtk$fYI< zDBc>Q@P``t1Bwmt2zUg%r7d_PM-|jn*L{AzyKkE5>s$Mv*=9G{O+Nk2@tgVm=6Eyn z=GAcuoIM?U2f!+{|$vLZKHr6miCb(v!dAQX>C1 z-vjv`$oD|H9;kk~XB!v|55og+7O=8p{|`Uq`=by8*JC$w5)@>63-Iwu_zR5hM(!Z3 zcMkZ!IKX$c;a|8a2mD`<#{W3!m*;^0Bk*%yKpQ?Mty>QG-vU4HgFRLowh38$o06I<(F9HsGV_|IY{)!zj?XNPF6J$m(|i{ImV^(S_MO2KZZ7u*Z&8#A9M-wxClx4}p#KA8F_R{)bC0qW;dg6bi?FRTaYX;595O|Iyy(_E+b9eAJ;kV#`Zhg^j3HO2eRDCPow!pc@CtH%<5f(rV zNdGr@6*PWhLC=czLFMTTawpU|M-f+lEduHISx#CVecQf2jP9kdKcJq?q;>S`D^7ZI zI3Lvi=HBQ}!a?=yC4HD(5AkC782r8~uIDSzu3d)-qx*Q8dNi&hj9!o=zb%Y`H6TBO z{88FGyFBqDVQU)wZ%7aQ?t$)8Y3kAZ4)qRzp2lNFntJ5_BA@@-8`If+lf2sJ$u1WZ zM)yY0xl?v*s)+XtDif53Za>)Vt3&l^9LM_0kk%XC1KXy;l`!0|pDvE>5%2=oZIB-~ zL8vnidTroAkgv8sT2r2c!-MRTt~KZum=6zt_D7Zf0o1Rd#sR(da4qN_cODFZP-_5s zCGa%JSM`C`BHd#z1>LO%`ktme{n39P)P_G`CG>(&{TNhdx-{xM6ts420@>60vk>^Q zckn;T9}CZe8Ov>iXL;$3-t(zPen;R%*a!0cJJ37`HQo)ij(Rj7HGg$xRQ(Ciy0jRE zf!43kZ9!LiTz41`S}SWn^=j=`{B+2~7|Z4ys06h^Hh+gtU=>V*OF;Hd1nKtx?fYlL zRWKXg0DTWI&$#BwdKe3#=XXQ>9+Xf0+N+O)sUVwsz_dl}kx!ZjdT;+8^z2JVKH7TP z531oA&{?Y}$X+JXu&H@f22)@KRDkByFQ7K;_t5-MJGO%M6Rjs>pc{l*monK7)r}t- zm&2elXzmPvK^~_eHD(>4Wu|?}*Pri!d=KPC54g#n<1{Q2QP-7TEaJ(Ml&tZ|m=}*n zQu(G_tu*O=>6J?uS>%;B@;om-$&(X3In|SQdvazp>1X2pNJ>VFkkxKHGMgQyJQ^#h zW(6#d)|A9}la@>*WvnDYdUZ4wi6di?1e&xL>ouxpuIpAvR=aLX$(j-xj!ZaC%3gV) zt9bGkUiM;|l)@0G{XeZno%pCI|I3;lV87LVtaI5YXw?1Gfi?}d|@ z+^Tn{PMNmnuOFL-!w0YnPV(nxI_}vw_bL-IZO>o7&;F~#TljS{8Mo|f4(@~|{a)eO z*Z6M)z3)8>P4*6gEzQ48p!d6(@EbJQ|8ZeU_BX?F(0cF=d=5?WEMUjjU!i$Md@$(l zpd7kqa_)Kd>e%!ym8)M!>b8dk8%}jx?EQ2d@wDIK$-4lw=bHUbZLKrr#EU`oYEJG4`C|07Uk z?LK|gf0kcQand_N6#fBYUvi?+Cww*7{%Kr~Hu-)@-eAx?Gxcd5NV^A;r}5Zh>FfTg zpQ)Fy?m8!e?mFKDjjNvPz&c4U1fBa{ftAqT)a94tm4Vg*qW2$q~seausampP7s_SErAKT##m=AZr^>72+1&@N(ot>b*TML&%sP*Lk z{Gkr@rN(R_$cCN|+1c&!6H@0e_0ufS{A{${AfKqW73i7j1*dxSKz4v8J%a@Kmrpye z9!T9QSbufBVx#MoTF6U?61WaV0@o&q>2QzxZer%7ig=`Oc05vntk|B2&W*cnBoT>6 z-Ps#lx0*{2-%s}xx|S$~+Ixy7y!mF%YRx+yx0H_vO0g(`Sxz-Hx#C<_GztGf$MyT^0!h}Z3z5dju7Q!!leSp}DZ&XZ$6`}1gs!4^pV-y7eI2ZAv?hWWxiSA2N{09jYBy0cx literal 0 HcmV?d00001