Skip to content

Commit

Permalink
Serve in initL before serving the main cmux listener.
Browse files Browse the repository at this point in the history
  • Loading branch information
adamgee committed Jun 9, 2017
1 parent f149a86 commit e528993
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
19 changes: 16 additions & 3 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,28 @@ type initServer struct {
server *Server
bootstrapped chan struct{}
mux sync.Mutex
waiting bool
}

func newInitServer(s *Server) *initServer {
return &initServer{server: s, bootstrapped: make(chan struct{})}
return &initServer{server: s, bootstrapped: make(chan struct{}), waiting: false}
}

func (s *initServer) startAndAwait(ctx context.Context, ln net.Listener) error {
func (s *initServer) serve(ctx context.Context, ln net.Listener) {
grpcServer := s.server.grpc
serverpb.RegisterInitServer(grpcServer, s)

s.server.stopper.RunWorker(ctx, func(context.Context) {
netutil.FatalIfUnexpected(grpcServer.Serve(ln))
})
}

func (s *initServer) awaitBootstrap() error {
{
s.mux.Lock()
s.waiting = true
s.mux.Unlock()
}

select {
case <-s.server.node.storeCfg.Gossip.Connected:
Expand All @@ -57,8 +66,12 @@ func (s *initServer) startAndAwait(ctx context.Context, ln net.Listener) error {

func (s *initServer) Bootstrap(ctx context.Context, request *serverpb.BootstrapRequest,
) (response *serverpb.BootstrapResponse, err error) {
s.mux.Lock() // Mux bootstrap
s.mux.Lock()
defer s.mux.Unlock()
if !s.waiting {
return nil, errors.New("Init not expecting Bootstrap")
}

if err := s.server.node.bootstrap(ctx, s.server.engines); err != nil {
log.Error(ctx, "Node bootstrap failed: ", err)
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ func (s *Server) Start(ctx context.Context) error {
initL := m.Match(func(_ io.Reader) bool {
return atomic.LoadInt32(&readyToServe) == 0
})
s.init.serve(ctx, initL)

pgL := m.Match(pgwire.Match)
anyL := m.Match(cmux.Any())
Expand Down Expand Up @@ -723,7 +724,7 @@ func (s *Server) Start(ctx context.Context) error {
log.Infof(ctx, "**** add additional nodes by specifying --join=%s", s.cfg.AdvertiseAddr)
} else {
log.Info(ctx, "No stores bootstrapped and --join flag specified, starting Init Server.")
if err = s.init.startAndAwait(ctx, initL); err != nil {
if err = s.init.awaitBootstrap(); err != nil {
return nil
}

Expand Down

0 comments on commit e528993

Please sign in to comment.