diff --git a/go.mod b/go.mod index 67233e0083..43a5bb8310 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/go-chi/chi v4.0.2+incompatible github.com/google/uuid v1.1.1 github.com/gorilla/securecookie v1.1.1 + github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect github.com/mholt/archiver/v3 v3.3.0 github.com/pkg/profile v1.3.0 github.com/prometheus/client_golang v1.3.0 diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 025ddb5072..0de35a4ca1 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -367,35 +367,37 @@ func (rg *RouteGroup) keepAliveLoop(interval time.Duration) { continue } - if err := rg.sendKeepAlive(); err != nil { - rg.logger.Warnf("Failed to send keepalive: %v", err) - } + rg.sendKeepAlive() } } } -func (rg *RouteGroup) sendKeepAlive() error { +func (rg *RouteGroup) sendKeepAlive() { rg.mu.Lock() defer rg.mu.Unlock() if len(rg.tps) == 0 || len(rg.fwd) == 0 { // if no transports, no rules, then no keepalive - return nil + return } - tp := rg.tps[0] - rule := rg.fwd[0] + for i := 0; i < len(rg.tps); i++ { + tp := rg.tps[i] + rule := rg.fwd[i] - if tp == nil { - return ErrBadTransport - } + if tp == nil { + continue + } - packet := routing.MakeKeepAlivePacket(rule.NextRouteID()) - if err := rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()); err != nil { - return err - } + packet := routing.MakeKeepAlivePacket(rule.NextRouteID()) + errCh := rg.writePacketAsync(context.Background(), tp, packet, rule.KeyRouteID()) - return nil + go func() { + if err := <-errCh; err != nil { + rg.logger.WithError(err).Warnf("Failed to send keepalive") + } + }() + } } // Close closes a RouteGroup with the specified close `code`: @@ -490,9 +492,12 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) { for i := 0; i < len(rg.tps); i++ { packet := routing.MakeClosePacket(rg.fwd[i].NextRouteID(), code) - if err := rg.writePacket(context.Background(), rg.tps[i], packet, rg.fwd[i].KeyRouteID()); err != nil { - rg.logger.WithError(err).Errorf("Failed to send close packet to %s", rg.tps[i].Remote()) - } + errCh := rg.writePacketAsync(context.Background(), rg.tps[i], packet, rg.fwd[i].KeyRouteID()) + go func(tp *transport.ManagedTransport) { + if err := <-errCh; err != nil { + rg.logger.WithError(err).Errorf("Failed to send close packet to %s", tp.Remote()) + } + }(rg.tps[i]) } } diff --git a/pkg/router/router.go b/pkg/router/router.go index afa11131fc..192d36e396 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -294,24 +294,38 @@ func (r *router) Serve(ctx context.Context) error { } func (r *router) serveTransportManager(ctx context.Context) { + var once sync.Once + done := make(chan struct{}) + for { - packet, err := r.tm.ReadPacket() - if err != nil { - if err == transport.ErrNotServing { - r.logger.WithError(err).Info("Stopped reading packets") - return - } - r.logger.WithError(err).Error("Stopped reading packets due to unexpected error.") + select { + case <-done: return - } - - if err := r.handleTransportPacket(ctx, packet); err != nil { - if err == transport.ErrNotServing { - r.logger.WithError(err).Warnf("Stopped serving Transport.") + default: + packet, err := r.tm.ReadPacket() + if err != nil { + if err == transport.ErrNotServing { + r.logger.WithError(err).Info("Stopped reading packets") + return + } + r.logger.WithError(err).Error("Stopped reading packets due to unexpected error.") return } - r.logger.Warnf("Failed to handle transport frame: %v", err) + go func(packet routing.Packet) { + if err := r.handleTransportPacket(ctx, packet); err != nil { + if err == transport.ErrNotServing { + once.Do(func() { + r.logger.WithError(err).Warnf("Stopped serving Transport.") + close(done) + }) + + return + } + + r.logger.Warnf("Failed to handle transport frame: %v", err) + } + }(packet) } } }