Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[M2] Fix default route group timeout #89

Merged
merged 25 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
00f4817
Remove custom timeout timer
Darkren Dec 23, 2019
3cfb825
Fix `writePacketAsync` to handle context properly
Darkren Dec 23, 2019
61b5506
Merge branch 'milestone2' of https://github.com/SkycoinProject/skywir…
Darkren Dec 25, 2019
f45ffec
Add proper handling of `Close` packets by the router
Darkren Dec 25, 2019
e42bc01
Start to implement Close loop
Darkren Dec 25, 2019
5fe03b0
Add proper handling of close packets
Darkren Dec 26, 2019
fceed7a
Add proper EOF return from route group's Read
Darkren Dec 26, 2019
0806d9a
Add closeInitiated flag initialization
Darkren Dec 26, 2019
6dd7c8b
Fix router close packet handling tests
Darkren Dec 27, 2019
972c5ff
Fix close test of route group
Darkren Dec 27, 2019
c70ddf2
Update vendor
Darkren Dec 30, 2019
f21db0e
Rewrite route group's `TestConn`
Darkren Jan 2, 2020
66b3f6b
Fixing route group test conn
Darkren Jan 6, 2020
8c966c9
Almost fix basic IO
Darkren Jan 6, 2020
e489de0
Finally fix the basic io subtest
Darkren Jan 7, 2020
49671fc
Fix basic io?
Darkren Jan 7, 2020
3b7975f
fix
Darkren Jan 9, 2020
4571bd6
Cleanup
Darkren Jan 9, 2020
1dbf3c0
Merge branch 'milestone2' of https://github.com/SkycoinProject/skywir…
Darkren Jan 9, 2020
ae86dae
Fix linter issues, update vendor
Darkren Jan 14, 2020
431885b
Up go version for travis
Darkren Jan 14, 2020
1808b2b
Downgrade go version for travis, remove go 1.13 error wrapping
Darkren Jan 14, 2020
ad399c2
Comment out failing tests
Darkren Jan 14, 2020
145aaa2
Fix visor's `TestListApps`
Darkren Jan 15, 2020
de036b4
Remove finished TODO
Darkren Jan 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 160 additions & 65 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

const (
defaultRouteGroupKeepAliveInterval = 1 * time.Minute
defaultRouteGroupIOTimeout = 3 * time.Second
defaultReadChBufSize = 1024
closeRoutineTimeout = 2 * time.Second
)

var (
Expand All @@ -46,15 +46,13 @@ func (timeoutError) Temporary() bool { return true }
type RouteGroupConfig struct {
ReadChBufSize int
KeepAliveInterval time.Duration
IOTimeout time.Duration
}

// DefaultRouteGroupConfig returns default RouteGroup config.
// Used by default if config is nil.
func DefaultRouteGroupConfig() *RouteGroupConfig {
return &RouteGroupConfig{
KeepAliveInterval: defaultRouteGroupKeepAliveInterval,
IOTimeout: defaultRouteGroupIOTimeout,
ReadChBufSize: defaultReadChBufSize,
}
}
Expand Down Expand Up @@ -89,11 +87,17 @@ type RouteGroup struct {
readCh chan []byte // push reads from Router
readChMu sync.Mutex
readBuf bytes.Buffer // for read overflow
done chan struct{}
once sync.Once

readDeadline deadline.PipeDeadline
writeDeadline deadline.PipeDeadline

// used as a bool to indicate if this particular route group initiated close loop
closeInitiated int32
remoteClosed int32
closed chan struct{}
// used to wait for all the `Close` packets to run through the loop and come back
closeDone sync.WaitGroup
}

// NewRouteGroup creates a new RouteGroup.
Expand All @@ -112,7 +116,7 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe
rvs: make([]routing.Rule, 0),
readCh: make(chan []byte, cfg.ReadChBufSize),
readBuf: bytes.Buffer{},
done: make(chan struct{}),
closed: make(chan struct{}),
readDeadline: deadline.MakePipeDeadline(),
writeDeadline: deadline.MakePipeDeadline(),
}
Expand All @@ -139,32 +143,38 @@ func (rg *RouteGroup) Read(p []byte) (n int, err error) {
return 0, nil
}

// In case the read buffer is short.
return rg.read(p)
}

// read reads incoming data. It tries to fetch the data from the internal buffer.
// If buffer is empty it blocks on receiving from the data channel
func (rg *RouteGroup) read(p []byte) (int, error) {
// first try the buffer for any already received data
rg.mu.Lock()
if rg.readBuf.Len() > 0 {
data, err := rg.readBuf.Read(p)
n, err := rg.readBuf.Read(p)
rg.mu.Unlock()

return data, err
return n, err
}
rg.mu.Unlock()

timeout := time.NewTimer(rg.cfg.IOTimeout)
defer timeout.Stop()

var data []byte
select {
case <-rg.readDeadline.Wait():
return 0, timeoutError{}
case <-timeout.C:
return 0, io.EOF
case data = <-rg.readCh:
}
case data, ok := <-rg.readCh:
if !ok || len(data) == 0 {
// route group got closed or empty data received. Behavior on the empty
// data is equivalent to the behavior of `read()` unix syscall as described here:
// https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.4.0/com.ibm.zos.v2r4.bpxbd00/rtrea.htm
return 0, io.EOF
}

rg.mu.Lock()
defer rg.mu.Unlock()
rg.mu.Lock()
defer rg.mu.Unlock()

return ioutil.BufRead(&rg.readBuf, data, p)
return ioutil.BufRead(&rg.readBuf, data, p)
}
}

// Write writes payload to a RouteGroup
Expand Down Expand Up @@ -197,17 +207,14 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {

packet := routing.MakeDataPacket(rule.KeyRouteID(), p)

errCh, cancel := rg.writePacketAsync(tp, packet)
defer cancel()
ctx, cancel := context.WithCancel(context.Background())

timeout := time.NewTimer(rg.cfg.IOTimeout)
defer timeout.Stop()
errCh := rg.writePacketAsync(ctx, tp, packet)
defer cancel()

select {
case <-rg.writeDeadline.Wait():
return 0, timeoutError{}
case <-timeout.C:
return 0, io.EOF
case err := <-errCh:
if err != nil {
return 0, err
Expand All @@ -219,20 +226,14 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {
}
}

func (rg *RouteGroup) writePacketAsync(tp *transport.ManagedTransport, packet routing.Packet) (chan error, func()) {
ctx, cancel := context.WithCancel(context.Background())

func (rg *RouteGroup) writePacketAsync(ctx context.Context, tp *transport.ManagedTransport, packet routing.Packet) chan error {
errCh := make(chan error)
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved

go func() {
select {
case <-ctx.Done():
case errCh <- tp.WritePacket(context.Background(), packet):
}
errCh <- tp.WritePacket(ctx, packet)
close(errCh)
}()

return errCh, cancel
return errCh
}

func (rg *RouteGroup) rule() (routing.Rule, error) {
Expand All @@ -259,42 +260,25 @@ func (rg *RouteGroup) tp() (*transport.ManagedTransport, error) {
return tp, nil
}

// Close closes a RouteGroup:
// - Send Close packet for all ForwardRules.
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
// Close closes a RouteGroup.
func (rg *RouteGroup) Close() error {
rg.mu.Lock()
defer rg.mu.Unlock()

if len(rg.fwd) != len(rg.tps) {
return ErrRuleTransportMismatch
}

for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), routing.CloseRequested)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
return err
}
if rg.isClosed() {
return io.ErrClosedPipe
}

rules := rg.rt.RulesWithDesc(rg.desc)
routeIDs := make([]routing.RouteID, 0, len(rules))

for _, rule := range rules {
routeIDs = append(routeIDs, rule.KeyRouteID())
if rg.isRemoteClosed() {
// remote already closed, everything is cleaned up,
// we just need to close signal channel at this point
close(rg.closed)
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

rg.rt.DelRules(routeIDs)
atomic.StoreInt32(&rg.closeInitiated, 1)

rg.once.Do(func() {
close(rg.done)
rg.readChMu.Lock()
close(rg.readCh)
rg.readChMu.Unlock()
})
rg.mu.Lock()
defer rg.mu.Unlock()

return nil
return rg.close(routing.CloseRequested)
}

// LocalAddr returns destination address of underlying RouteDescriptor.
Expand Down Expand Up @@ -369,11 +353,122 @@ func (rg *RouteGroup) sendKeepAlive() error {
return nil
}

// Close closes a RouteGroup with the specified close `code`:
// - Send Close packet for all ForwardRules with the code `code`.
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
func (rg *RouteGroup) close(code routing.CloseCode) error {
if rg.isClosed() {
return nil
}

if len(rg.fwd) != len(rg.tps) {
return ErrRuleTransportMismatch
}

closeInitiator := rg.isCloseInitiator()

if closeInitiator {
// will wait for close response from all the transports
rg.closeDone.Add(len(rg.tps))
}

if err := rg.broadcastClosePackets(code); err != nil {
// TODO: decide if we should return this error, or close route group anyway
return err
}

if closeInitiator {
// if this node initiated closing, we need to wait for close packets
// to come back, or to exit with a timeout if anything goes wrong in
// the network
if err := rg.waitForCloseLoop(closeRoutineTimeout); err != nil {
rg.logger.Errorf("Error during close loop: %v", err)
}
}

rules := make([]routing.RouteID, 0, len(rg.fwd))
for _, r := range rg.fwd {
rules = append(rules, r.KeyRouteID())
}

rg.rt.DelRules(rules)

rg.once.Do(func() {
if closeInitiator {
close(rg.closed)
}

atomic.StoreInt32(&rg.remoteClosed, 1)
rg.readChMu.Lock()
close(rg.readCh)
rg.readChMu.Unlock()
})

return nil
}

func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
rg.logger.Infof("Got close packet with code %d", code)

if rg.isCloseInitiator() {
// this route group initiated close loop and got response
rg.logger.Debugf("Handling response close packet with code %d", code)

rg.closeDone.Done()
return nil
}

// TODO: use `close` with some close code if we decide that it should be different from the current one
return rg.close(code)
}

func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) error {
for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), code)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
// TODO: decide if we should return this error, or close route group anyway
return err
}
}

return nil
}

func (rg *RouteGroup) waitForCloseLoop(waitTimeout time.Duration) error {
closeCtx, closeCancel := context.WithTimeout(context.Background(), waitTimeout)
defer closeCancel()

closeDoneCh := make(chan struct{})
go func() {
// wait till all remotes respond to close procedure
rg.closeDone.Wait()
close(closeDoneCh)
}()

select {
case <-closeCtx.Done():
return fmt.Errorf("close loop timed out: %v", closeCtx.Err())
case <-closeDoneCh:
}

return nil
}

func (rg *RouteGroup) isCloseInitiator() bool {
return atomic.LoadInt32(&rg.closeInitiated) == 1
}

func (rg *RouteGroup) isRemoteClosed() bool {
return atomic.LoadInt32(&rg.remoteClosed) == 1
}

func (rg *RouteGroup) isClosed() bool {
select {
case <-rg.done:
case <-rg.closed:
return true
default:
return false
}

return false
}
Loading