Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24789][Go SDK] Fix Minor race conditions #24808

Merged
merged 5 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions sdks/go/pkg/beam/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"os"
"sync/atomic"
)

// Severity is the severity of the log message.
Expand All @@ -44,23 +45,31 @@ type Logger interface {
Log(ctx context.Context, sev Severity, calldepth int, msg string)
}

var (
logger Logger = &Standard{}
)
var logger atomic.Value

// concreteLogger works around atomic.Value's requirement that the type
// be identical for all callers.
type concreteLogger struct {
Logger
}

func init() {
logger.Store(&concreteLogger{&Standard{}})
}

// SetLogger sets the global Logger. Intended to be called during initialization
// only.
func SetLogger(l Logger) {
if l == nil {
panic("Logger cannot be nil")
}
logger = l
logger.Store(&concreteLogger{l})
}

// Output logs the given message to the global logger. Calldepth is the count
// of the number of frames to skip when computing the file name and line number.
func Output(ctx context.Context, sev Severity, calldepth int, msg string) {
logger.Log(ctx, sev, calldepth+1, msg) // +1 for this frame
logger.Load().(Logger).Log(ctx, sev, calldepth+1, msg) // +1 for this frame
}

// User-facing logging functions.
Expand Down
19 changes: 16 additions & 3 deletions sdks/go/pkg/beam/runners/universal/extworker/extworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (s *Loopback) StartWorker(ctx context.Context, req *fnpb.StartWorkerRequest
log.Infof(ctx, "starting worker %v", req.GetWorkerId())
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
return &fnpb.StartWorkerResponse{
Error: "worker pool shutting down",
}, nil
}

if _, ok := s.workers[req.GetWorkerId()]; ok {
return &fnpb.StartWorkerResponse{
Error: fmt.Sprintf("worker with ID %q already exists", req.GetWorkerId()),
Expand Down Expand Up @@ -92,6 +98,10 @@ func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest)
log.Infof(ctx, "stopping worker %v", req.GetWorkerId())
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
// Worker pool is already shutting down, so no action is needed.
return &fnpb.StopWorkerResponse{}, nil
}
if cancelfn, ok := s.workers[req.GetWorkerId()]; ok {
cancelfn()
delete(s.workers, req.GetWorkerId())
Expand All @@ -106,12 +116,15 @@ func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest)
// Stop terminates the service and stops all workers.
func (s *Loopback) Stop(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()

log.Infof(ctx, "stopping Loopback, and %d workers", len(s.workers))
s.workers = map[string]context.CancelFunc{}
s.lis.Close()
s.workers = nil
s.rootCancel()

// There can be a deadlock between the StopWorker RPC and GracefulStop
// which waits for all RPCs to finish, so it must be outside the critical section.
s.mu.Unlock()
damccorm marked this conversation as resolved.
Show resolved Hide resolved

s.grpcServer.GracefulStop()
return nil
}
Expand Down