Skip to content

Commit

Permalink
Merge pull request #223 from nkryuchkov/feature/async-router-packets
Browse files Browse the repository at this point in the history
Send router packets asynchronously
  • Loading branch information
nkryuchkov authored Mar 12, 2020
2 parents 13b39e8 + b975415 commit 27e1672
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 31 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/go-chi/chi v4.0.2+incompatible
github.com/google/uuid v1.1.1
github.com/gorilla/securecookie v1.1.1
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/mholt/archiver/v3 v3.3.0
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v1.3.0
Expand Down
41 changes: 23 additions & 18 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,35 +367,37 @@ func (rg *RouteGroup) keepAliveLoop(interval time.Duration) {
continue
}

if err := rg.sendKeepAlive(); err != nil {
rg.logger.Warnf("Failed to send keepalive: %v", err)
}
rg.sendKeepAlive()
}
}
}

func (rg *RouteGroup) sendKeepAlive() error {
func (rg *RouteGroup) sendKeepAlive() {
rg.mu.Lock()
defer rg.mu.Unlock()

if len(rg.tps) == 0 || len(rg.fwd) == 0 {
// if no transports, no rules, then no keepalive
return nil
return
}

tp := rg.tps[0]
rule := rg.fwd[0]
for i := 0; i < len(rg.tps); i++ {
tp := rg.tps[i]
rule := rg.fwd[i]

if tp == nil {
return ErrBadTransport
}
if tp == nil {
continue
}

packet := routing.MakeKeepAlivePacket(rule.NextRouteID())
if err := rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()); err != nil {
return err
}
packet := routing.MakeKeepAlivePacket(rule.NextRouteID())
errCh := rg.writePacketAsync(context.Background(), tp, packet, rule.KeyRouteID())

return nil
go func() {
if err := <-errCh; err != nil {
rg.logger.WithError(err).Warnf("Failed to send keepalive")
}
}()
}
}

// Close closes a RouteGroup with the specified close `code`:
Expand Down Expand Up @@ -490,9 +492,12 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) {
for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].NextRouteID(), code)
if err := rg.writePacket(context.Background(), rg.tps[i], packet, rg.fwd[i].KeyRouteID()); err != nil {
rg.logger.WithError(err).Errorf("Failed to send close packet to %s", rg.tps[i].Remote())
}
errCh := rg.writePacketAsync(context.Background(), rg.tps[i], packet, rg.fwd[i].KeyRouteID())
go func(tp *transport.ManagedTransport) {
if err := <-errCh; err != nil {
rg.logger.WithError(err).Errorf("Failed to send close packet to %s", tp.Remote())
}
}(rg.tps[i])
}
}

Expand Down
40 changes: 27 additions & 13 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,24 +294,38 @@ func (r *router) Serve(ctx context.Context) error {
}

func (r *router) serveTransportManager(ctx context.Context) {
var once sync.Once
done := make(chan struct{})

for {
packet, err := r.tm.ReadPacket()
if err != nil {
if err == transport.ErrNotServing {
r.logger.WithError(err).Info("Stopped reading packets")
return
}
r.logger.WithError(err).Error("Stopped reading packets due to unexpected error.")
select {
case <-done:
return
}

if err := r.handleTransportPacket(ctx, packet); err != nil {
if err == transport.ErrNotServing {
r.logger.WithError(err).Warnf("Stopped serving Transport.")
default:
packet, err := r.tm.ReadPacket()
if err != nil {
if err == transport.ErrNotServing {
r.logger.WithError(err).Info("Stopped reading packets")
return
}
r.logger.WithError(err).Error("Stopped reading packets due to unexpected error.")
return
}

r.logger.Warnf("Failed to handle transport frame: %v", err)
go func(packet routing.Packet) {
if err := r.handleTransportPacket(ctx, packet); err != nil {
if err == transport.ErrNotServing {
once.Do(func() {
r.logger.WithError(err).Warnf("Stopped serving Transport.")
close(done)
})

return
}

r.logger.Warnf("Failed to handle transport frame: %v", err)
}
}(packet)
}
}
}
Expand Down

0 comments on commit 27e1672

Please sign in to comment.