Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/cmd/vtbackup: wait for plugins to finish initializing #14113

Merged
merged 2 commits into from
Sep 28, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,14 @@ func run(_ *cobra.Command, args []string) error {
<-ctx.Done()
}()

// Some plugins use OnRun to initialize. Use a channel to signal to
// ourselves that these plugins have been initialized.
runCh := make(chan struct{})
servenv.OnRun(func() {
close(runCh)
})
go servenv.RunDefault()
<-runCh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems fine to me; it might be a bit more readable (in absence of the comment) to do this:

diff --git a/go/stats/opentsdb/init.go b/go/stats/opentsdb/init.go
index 51186ad765..0782deb8a2 100644
--- a/go/stats/opentsdb/init.go
+++ b/go/stats/opentsdb/init.go
@@ -17,18 +17,24 @@ limitations under the License.
 package opentsdb
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
 	"net/url"
 	"sort"
+	"sync"
 
 	"vitess.io/vitess/go/stats"
 	"vitess.io/vitess/go/vt/log"
 	"vitess.io/vitess/go/vt/servenv"
 )
 
-var singletonBackend stats.PushBackend
+var (
+	singletonOnce    sync.Once
+	singletonInit    = make(chan struct{})
+	singletonBackend stats.PushBackend
+)
 
 // Init attempts to create a singleton *opentsdb.backend and register it as a PushBackend.
 // If it fails to create one, this is a noop. The prefix argument is an optional string
@@ -41,12 +47,23 @@ func Init(prefix string) {
 		if err != nil {
 			log.Infof("Failed to initialize singleton opentsdb backend: %v", err)
 		} else {
+			defer singletonOnce.Do(func() { close(singletonInit) })
+
 			singletonBackend = backend
 			log.Info("Initialized opentsdb backend.")
 		}
 	})
 }
 
+func AwaitInit(ctx context.Context) error {
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-singletonInit:
+		return nil
+	}
+}
+
 // InitWithoutServenv initializes the opentsdb without servenv
 func InitWithoutServenv(prefix string) (stats.PushBackend, error) {
 	b, err := newBackend(prefix)

and then here:

go servenv.RunDefault()
if err := opentsdb.AwaitInit(ctx); err != nil {
    return err
}

(or abstract a bit further so we don't expose the particular plugins (i.e. stats.AwaitPlugins(ctx)))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I'll go with the middle option for now (opentsdb.AwaitInit).

Copy link
Collaborator Author

@maxenglander maxenglander Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually going with third option since we don't know if user will select --stats_backend opentsdb or something else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But opentsdbs OnRun still gets registered either way, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajm188 ah that's true 😳

Do you prefer I switch to that or are you OK with what I pushed up for stats.AwaitBackend?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i like yours structurally (since it's possible of course to compile vitess without a particular plugin_ file), however, the problem is this close happens during init() and not during servenv.OnRun

we need some mechanism for a plugin to signal back to the main stats package that it's been fully initialized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something maybe like this:

diff --git a/go/stats/export.go b/go/stats/export.go
index 8bda85c87b..07381f9929 100644
--- a/go/stats/export.go
+++ b/go/stats/export.go
@@ -29,6 +29,7 @@ package stats
 
 import (
 	"bytes"
+	"context"
 	"expvar"
 	"fmt"
 	"strconv"
@@ -218,6 +219,12 @@ func RegisterPushBackend(name string, backend PushBackend) {
 		log.Fatalf("PushBackend %s already exists; can't register the same name multiple times", name)
 	}
 	pushBackends[name] = backend
+	backends[name] = &struct {
+		once sync.Once
+		ch   chan struct{}
+	}{
+		ch: make(chan struct{}),
+	}
 	if emitStats {
 		// Start a single goroutine to emit stats periodically
 		once.Do(func() {
@@ -226,6 +233,36 @@ func RegisterPushBackend(name string, backend PushBackend) {
 	}
 }
 
+var (
+	backends = map[string]*struct {
+		once sync.Once
+		ch   chan struct{}
+	}{}
+)
+
+func InitializeBackend(backend string) {
+	if d, ok := backends[backend]; ok {
+		d.once.Do(func() { close(d.ch) })
+	}
+}
+
+func AwaitBackend(ctx context.Context) error {
+	pushBackendsLock.Lock()
+	defer pushBackendsLock.Unlock()
+
+	d, ok := backends[statsBackend]
+	if !ok {
+		return fmt.Errorf("...")
+	}
+
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-d.ch:
+		return nil
+	}
+}
+
 // emitToBackend does a periodic emit to the selected PushBackend. If a push fails,
 // it will be logged as a warning (but things will otherwise proceed as normal).
 func emitToBackend(emitPeriod *time.Duration) {
diff --git a/go/stats/opentsdb/init.go b/go/stats/opentsdb/init.go
index 51186ad765..462a442501 100644
--- a/go/stats/opentsdb/init.go
+++ b/go/stats/opentsdb/init.go
@@ -41,6 +41,8 @@ func Init(prefix string) {
 		if err != nil {
 			log.Infof("Failed to initialize singleton opentsdb backend: %v", err)
 		} else {
+			defer stats.Initialize("opentsdb")
+
 			singletonBackend = backend
 			log.Info("Initialized opentsdb backend.")
 		}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajm188 and I chatted about this part:

the problem is this close happens during init() and not during servenv.OnRun

It turns out opentsdb is a bit special in this regard in that it exposes a Init function that uses servenv.OnRun under the hood. So the full sequence is:

  1. opentsdb.Init("vtbackup")
  2. func Init(prefix string) {
    // Needs to happen in servenv.OnRun() instead of init because it requires flag parsing and logging
    servenv.OnRun(func() {
  3. func Run(port int) {
    populateListeningURL(int32(port))
    createGRPCServer()
    onRunHooks.Fire()
  4. func InitWithoutServenv(prefix string) (stats.PushBackend, error) {
    b, err := newBackend(prefix)
    if err != nil {
    return nil, err
    }
    stats.RegisterPushBackend("opentsdb", b)
  5. vitess/go/stats/export.go

    Lines 228 to 236 in 83932bc

    func RegisterPushBackend(name string, backend PushBackend) {
    pushBackendsLock.Lock()
    defer pushBackendsLock.Unlock()
    if _, ok := pushBackends[name]; ok {
    log.Fatalf("PushBackend %s already exists; can't register the same name multiple times", name)
    }
    pushBackends[name] = backend
    if name == statsBackend {
    close(statsBackendInit)

It's probably worth making the opentsdb package and consumer plugins more like the rest of the code base at some point, but will leave things as they are for now if that's alright.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, you're totally right! this is looking good to me now


if detachedMode {
// this method will call os.Exit and kill this process
Expand Down
Loading