Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dmsg delete entry on shutdown #883

Merged
merged 14 commits into from
Sep 16, 2021
4 changes: 2 additions & 2 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,9 @@ func (r *router) serveSetup() {
log := r.logger.WithError(err)
if err == dmsg.ErrEntityClosed {
ersonp marked this conversation as resolved.
Show resolved Hide resolved
log.Info("Setup client stopped serving.")
} else {
log.Error("Setup client stopped serving due to unexpected error.")
return
}
log.Error("Setup client stopped serving due to unexpected error.")
return
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,16 @@ func (tm *Manager) acceptTransports(ctx context.Context, lis network.Listener, t
return
default:
if err := tm.acceptTransport(ctx, lis); err != nil {
tm.Logger.Warnf("Failed to accept transport: %v", err)
log := tm.Logger.WithError(err)
if errors.Is(err, dmsg.ErrEntityClosed) {
log.Info("Dmsg client stopped serving.")
return
}
if errors.Is(err, io.ErrClosedPipe) {
return
}
log.Warnf("Failed to accept transport")
return
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/transport/setup/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ func NewTransportListener(ctx context.Context, conf *visorconfig.V1, dmsgC *dmsg
log := masterLogger.PackageLogger("transport_setup")
log.WithField("local_pk", conf.PK).Info("Connecting to the dmsg network.")

go dmsgC.Serve(ctx)

select {
case <-dmsgC.Ready():
log.WithField("local_pk", conf.PK).Info("Connected!")
Expand Down Expand Up @@ -62,7 +60,12 @@ func (ts *TransportListener) Serve(ctx context.Context) {
for {
conn, err := lis.AcceptStream()
if err != nil {
ts.log.WithError(err).Error("failed to accept")
log := ts.log.WithError(err)
if err == dmsg.ErrEntityClosed {
ersonp marked this conversation as resolved.
Show resolved Hide resolved
log.Info("Dmsg client stopped serving.")
break
}
log.Error("Failed to accept")
break
}
remotePK := conn.RawRemoteAddr().PK
Expand Down
8 changes: 7 additions & 1 deletion pkg/visor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,13 @@ func (hv *Hypervisor) serveDmsg(ctx context.Context, log *logging.Logger) {
go func() {
<-hv.dmsgC.Ready()
if err := hv.ServeRPC(ctx, hv.c.DmsgPort); err != nil {
log.WithError(err).Fatal("Failed to serve RPC client over dmsg.")
log := log.WithError(err)
if err == dmsg.ErrEntityClosed {
ersonp marked this conversation as resolved.
Show resolved Hide resolved
log.Info("Dmsg client stopped serving.")
return
}
log.Error("Failed to serve RPC client over dmsg.")
return
}
}()
log.WithField("addr", dmsg.Addr{PK: hv.c.PK, Port: hv.c.DmsgPort}).
Expand Down
17 changes: 14 additions & 3 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,20 @@ func initDmsg(ctx context.Context, v *Visor, log *logging.Logger) error {
}
dmsgC := dmsgc.New(v.conf.PK, v.conf.SK, v.ebc, v.conf.Dmsg)

time.Sleep(200 * time.Millisecond)
go dmsgC.Serve(context.Background())
time.Sleep(200 * time.Millisecond)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
dmsgC.Serve(context.Background())
}()

v.pushCloseStack("dmsg", func() error {
if err := dmsgC.Close(); err != nil {
jdknives marked this conversation as resolved.
Show resolved Hide resolved
return err
}
wg.Wait()
return nil
})

v.initLock.Lock()
v.dmsgC = dmsgC
Expand Down