From 8042ebe82b42b12d419c1e432aa5f71055898d56 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 12 Sep 2023 19:24:25 +1000 Subject: [PATCH 1/2] feat: publicly expose e2e TestIndexerRunner --- e2e_test.go | 258 +++++++++++--------------------------------- test/testindexer.go | 200 ++++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+), 196 deletions(-) create mode 100644 test/testindexer.go diff --git a/e2e_test.go b/e2e_test.go index 5edc562f5..1e43a84a9 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -4,14 +4,12 @@ package main_test //often because it's non-reproducible. TODO fixme import ( - "bufio" "bytes" "context" "fmt" "io" "net/http" "os" - "os/exec" "path/filepath" "runtime" "strings" @@ -22,6 +20,7 @@ import ( "github.com/ipni/go-libipni/find/model" "github.com/ipni/storetheindex/carstore" "github.com/ipni/storetheindex/config" + "github.com/ipni/storetheindex/test" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) @@ -32,100 +31,6 @@ import ( // We initialize their setup, start the two daemons, and connect the peers. // We then import a CAR file and query its CIDs. -type e2eTestRunner struct { - t *testing.T - dir string - ctx context.Context - env []string - - indexerReady chan struct{} - providerReady chan struct{} - dhstoreReady chan struct{} - providerHasPeer chan struct{} -} - -func (e *e2eTestRunner) run(name string, args ...string) []byte { - e.t.Helper() - - e.t.Logf("run: %s %s", name, strings.Join(args, " ")) - - cmd := exec.CommandContext(e.ctx, name, args...) - cmd.Env = e.env - out, err := cmd.CombinedOutput() - require.NoError(e.t, err, string(out)) - return out -} - -func (e *e2eTestRunner) start(prog string, args ...string) *exec.Cmd { - e.t.Helper() - - name := filepath.Base(prog) - e.t.Logf("run: %s %s", name, strings.Join(args, " ")) - - cmd := exec.CommandContext(e.ctx, prog, args...) - cmd.Env = e.env - - stdout, err := cmd.StdoutPipe() - require.NoError(e.t, err) - cmd.Stderr = cmd.Stdout - - scanner := bufio.NewScanner(stdout) - - go func() { - for scanner.Scan() { - line := scanner.Text() - - // Logging every single line via the test output is verbose, - // but helps see what's happening, especially when the test fails. - e.t.Logf("%s: %s", name, line) - - switch name { - case "storetheindex": - if strings.Contains(line, "Indexer is ready") { - e.indexerReady <- struct{}{} - } - case "provider": - line = strings.ToLower(line) - if strings.Contains(line, "connected to peer successfully") { - e.providerHasPeer <- struct{}{} - } else if strings.Contains(line, "admin http server listening") { - e.providerReady <- struct{}{} - } - case "dhstore": - if strings.Contains(line, "Store opened.") { - e.dhstoreReady <- struct{}{} - } - } - } - }() - - err = cmd.Start() - require.NoError(e.t, err) - return cmd -} - -func (e *e2eTestRunner) stop(cmd *exec.Cmd, timeout time.Duration) { - sig := os.Interrupt - if runtime.GOOS == "windows" { - // Windows can't send SIGINT. - sig = os.Kill - } - err := cmd.Process.Signal(sig) - require.NoError(e.t, err) - - waitErr := make(chan error, 1) - go func() { waitErr <- cmd.Wait() }() - - select { - case <-time.After(timeout): - e.t.Logf("killing command after %s: %s", timeout, cmd) - err = cmd.Process.Kill() - require.NoError(e.t, err) - case err = <-waitErr: - require.NoError(e.t, err) - } -} - func TestEndToEndWithAllProviderTypes(t *testing.T) { if os.Getenv("CI") != "" { t.Skip("Skipping e2e test in CI environment") @@ -159,86 +64,47 @@ func TestEndToEndWithAllProviderTypes(t *testing.T) { func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() - e := &e2eTestRunner{ - t: t, - dir: t.TempDir(), - ctx: ctx, - - indexerReady: make(chan struct{}, 1), - providerReady: make(chan struct{}, 1), - dhstoreReady: make(chan struct{}, 1), - providerHasPeer: make(chan struct{}, 1), - } - carPath := filepath.Join(e.dir, "sample-wrapped-v2.car") + indexerReady := test.NewIndexerReadyWatcher() + providerReady := test.NewProviderReadyWatcher() + providerHasPeer := test.NewProviderHasPeerWatcher() + dhstoreReady := test.NewDhstoreReadyWatcher() + + e := test.NewTestIndexerRunner(t, ctx, t.TempDir(), indexerReady, providerReady, providerHasPeer, dhstoreReady) + + carPath := filepath.Join(e.Dir, "sample-wrapped-v2.car") err := downloadFile("https://github.com/ipni/index-provider/raw/main/testdata/sample-wrapped-v2.car", carPath) require.NoError(t, err) - // Use a clean environment, with the host's PATH, and a temporary HOME. - // We also tell "go install" to place binaries there. - hostEnv := os.Environ() - var filteredEnv []string - for _, env := range hostEnv { - if strings.Contains(env, "CC") || strings.Contains(env, "LDFLAGS") || strings.Contains(env, "CFLAGS") { - // Bring in the C compiler flags from the host. For example on a Nix - // machine, this compilation within the test will fail since the compiler - // will not find correct libraries. - filteredEnv = append(filteredEnv, env) - } else if strings.HasPrefix(env, "PATH") { - // Bring in the host's PATH. - filteredEnv = append(filteredEnv, env) - } - } - e.env = append(filteredEnv, []string{ - "HOME=" + e.dir, - "GOBIN=" + e.dir, - }...) - if runtime.GOOS == "windows" { - const gopath = "C:\\Projects\\Go" - err = os.MkdirAll(gopath, 0666) - require.NoError(t, err) - e.env = append(e.env, fmt.Sprintf("GOPATH=%s", gopath)) - } - t.Logf("Env: %s", strings.Join(e.env, " ")) - - // Reuse the host's build and module download cache. - // This should allow "go install" to reuse work. - for _, name := range []string{"GOCACHE", "GOMODCACHE"} { - out, err := exec.Command("go", "env", name).CombinedOutput() - require.NoError(t, err) - out = bytes.TrimSpace(out) - e.env = append(e.env, fmt.Sprintf("%s=%s", name, out)) - } - // install storetheindex - indexer := filepath.Join(e.dir, "storetheindex") - e.run("go", "install", ".") + indexer := filepath.Join(e.Dir, "storetheindex") + e.Run("go", "install", ".") - provider := filepath.Join(e.dir, "provider") - dhstore := filepath.Join(e.dir, "dhstore") - ipni := filepath.Join(e.dir, "ipni") + provider := filepath.Join(e.Dir, "provider") + dhstore := filepath.Join(e.Dir, "dhstore") + ipni := filepath.Join(e.Dir, "ipni") cwd, err := os.Getwd() require.NoError(t, err) - err = os.Chdir(e.dir) + err = os.Chdir(e.Dir) require.NoError(t, err) // install index-provider switch publisherProto { case "dtsync": // Install index-provider that supports dtsync. - e.run("go", "install", "github.com/ipni/index-provider/cmd/provider@v0.13.6") + e.Run("go", "install", "github.com/ipni/index-provider/cmd/provider@v0.13.6") case "libp2p", "libp2phttp", "http": - e.run("go", "install", "github.com/ipni/index-provider/cmd/provider@latest") + e.Run("go", "install", "github.com/ipni/index-provider/cmd/provider@latest") default: panic("providerProto must be one of: libp2phttp, http, dtsync") } // install dhstore - e.run("go", "install", "-tags", "nofdb", "github.com/ipni/dhstore/cmd/dhstore@latest") + e.Run("go", "install", "-tags", "nofdb", "github.com/ipni/dhstore/cmd/dhstore@latest") // install ipni-cli - e.run("go", "install", "github.com/ipni/ipni-cli/cmd/ipni@latest") + e.Run("go", "install", "github.com/ipni/ipni-cli/cmd/ipni@latest") err = os.Chdir(cwd) require.NoError(t, err) @@ -246,23 +112,23 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { // initialize index-provider switch publisherProto { case "dtsync": - e.run(provider, "init") + e.Run(provider, "init") case "http": - e.run(provider, "init", "--pubkind=http") + e.Run(provider, "init", "--pubkind=http") case "libp2p": - e.run(provider, "init", "--pubkind=libp2phttp") + e.Run(provider, "init", "--pubkind=libp2phttp") case "libp2phttp": - e.run(provider, "init", "--pubkind=libp2phttp") + e.Run(provider, "init", "--pubkind=libp2phttp") } - providerCfgPath := filepath.Join(e.dir, ".index-provider", "config") + providerCfgPath := filepath.Join(e.Dir, ".index-provider", "config") cfg, err := config.Load(providerCfgPath) require.NoError(t, err) providerID := cfg.Identity.PeerID t.Logf("Initialized provider ID: %s", providerID) // initialize indexer - e.run(indexer, "init", "--store", "pebble", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap") - stiCfgPath := filepath.Join(e.dir, ".storetheindex", "config") + e.Run(indexer, "init", "--store", "pebble", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap") + stiCfgPath := filepath.Join(e.Dir, ".storetheindex", "config") cfg, err = config.Load(stiCfgPath) require.NoError(t, err) indexerID := cfg.Identity.PeerID @@ -272,54 +138,54 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { Storage: config.FileStore{ Type: "local", Local: config.LocalFileStore{ - BasePath: e.dir, + BasePath: e.Dir, }, }, } cfg.Save(stiCfgPath) // start provider - cmdProvider := e.start(provider, "daemon") + cmdProvider := e.Start(provider, "daemon") select { - case <-e.providerReady: + case <-providerReady.Signal: case <-ctx.Done(): t.Fatal("timed out waiting for provider to start") } // start dhstore - cmdDhstore := e.start(dhstore, "--storePath", e.dir) + cmdDhstore := e.Start(dhstore, "--storePath", e.Dir) select { - case <-e.dhstoreReady: + case <-dhstoreReady.Signal: case <-ctx.Done(): t.Fatal("timed out waiting for dhstore to start") } // start indexer - cmdIndexer := e.start(indexer, "daemon") + cmdIndexer := e.Start(indexer, "daemon") select { - case <-e.indexerReady: + case <-indexerReady.Signal: case <-ctx.Done(): t.Fatal("timed out waiting for indexer to start") } // connect provider to the indexer - e.run(provider, "connect", + e.Run(provider, "connect", "--imaddr", fmt.Sprintf("/dns/localhost/tcp/3003/p2p/%s", indexerID), "--listen-admin", "http://localhost:3102", ) select { - case <-e.providerHasPeer: + case <-providerHasPeer.Signal: case <-ctx.Done(): t.Fatal("timed out waiting for provider to connect to indexer") } // Allow provider advertisements, regardless of default policy. - e.run(indexer, "admin", "allow", "-i", "http://localhost:3002", "--peer", providerID) + e.Run(indexer, "admin", "allow", "-i", "http://localhost:3002", "--peer", providerID) // Import a car file into the provider. This will cause the provider to // publish an advertisement that the indexer will read. The indexer will // then import the advertised content. - outImport := e.run(provider, "import", "car", + outImport := e.Run(provider, "import", "car", "-i", carPath, "--listen-admin", "http://localhost:3102", ) @@ -331,7 +197,7 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { "2DrjgbFdhNiSJghFWcQbzw6E8y4jU1Z7ZsWo3dJbYxwGTNFmAj", "2DrjgbFY1BnkgZwA3oL7ijiDn7sJMf4bhhQNTtDqgZP826vGzv", } { - findOutput := e.run(ipni, "find", "--no-priv", "-i", "http://localhost:3000", "-mh", mh) + findOutput := e.Run(ipni, "find", "--no-priv", "-i", "http://localhost:3000", "-mh", mh) t.Logf("import output:\n%s\n", findOutput) if bytes.Contains(findOutput, []byte("not found")) { @@ -345,10 +211,10 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { return true }, 10*time.Second, time.Second) - e.run("sync") + e.Run("sync") // Check that ad was saved as CAR file. - dir, err := os.Open(e.dir) + dir, err := os.Open(e.Dir) require.NoError(t, err) names, err := dir.Readdirnames(-1) dir.Close() @@ -365,9 +231,9 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { require.Equal(t, 1, carCount) require.Equal(t, 1, headCount) - root2 := filepath.Join(e.dir, ".storetheindex2") - e.env = append(e.env, fmt.Sprintf("%s=%s", config.EnvDir, root2)) - e.run(indexer, "init", "--store", "dhstore", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap", "--dhstore", "http://127.0.0.1:40080", + root2 := filepath.Join(e.Dir, ".storetheindex2") + e.Env = append(e.Env, fmt.Sprintf("%s=%s", config.EnvDir, root2)) + e.Run(indexer, "init", "--store", "dhstore", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap", "--dhstore", "http://127.0.0.1:40080", "--listen-admin", "/ip4/127.0.0.1/tcp/3202", "--listen-finder", "/ip4/127.0.0.1/tcp/3200", "--listen-ingest", "/ip4/127.0.0.1/tcp/3201", "--listen-p2p", "/ip4/127.0.0.1/tcp/3203") @@ -376,37 +242,37 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { require.NoError(t, err) indexer2ID := cfg.Identity.PeerID - cmdIndexer2 := e.start(indexer, "daemon") + cmdIndexer2 := e.Start(indexer, "daemon") select { - case <-e.indexerReady: + case <-indexerReady.Signal: case <-ctx.Done(): t.Fatal("timed out waiting for indexer2 to start") } - outProviders := e.run(ipni, "provider", "--all", "--indexer", "http://localhost:3200") + outProviders := e.Run(ipni, "provider", "--all", "--indexer", "http://localhost:3200") require.Contains(t, string(outProviders), "No providers registered with indexer", "expected no providers message") // import providers from first indexer. - e.run(indexer, "admin", "import-providers", "--indexer", "http://localhost:3202", "--from", "localhost:3000") + e.Run(indexer, "admin", "import-providers", "--indexer", "http://localhost:3202", "--from", "localhost:3000") // Check that provider ID now appears in providers output. - outProviders = e.run(ipni, "provider", "--all", "--indexer", "http://localhost:3200", "--id-only") + outProviders = e.Run(ipni, "provider", "--all", "--indexer", "http://localhost:3200", "--id-only") require.Contains(t, string(outProviders), providerID, "expected provider id in providers output after import-providers") // Connect provider to the 2nd indexer. - e.run(provider, "connect", + e.Run(provider, "connect", "--imaddr", fmt.Sprintf("/dns/localhost/tcp/3203/p2p/%s", indexer2ID), "--listen-admin", "http://localhost:3102", ) select { - case <-e.providerHasPeer: + case <-providerHasPeer.Signal: case <-ctx.Done(): t.Fatal("timed out waiting for provider to connect to indexer") } // Tell provider to send direct announce to 2nd indexer. - out := e.run(provider, "announce-http", + out := e.Run(provider, "announce-http", "-i", "http://localhost:3201", "--listen-admin", "http://localhost:3102", ) @@ -422,7 +288,7 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { var dhResp *model.FindResponse require.Eventually(t, func() bool { - dhResp, err = client.Find(e.ctx, mh) + dhResp, err = client.Find(e.Ctx, mh) return err == nil && len(dhResp.MultihashResults) != 0 }, 10*time.Second, time.Second) @@ -435,7 +301,7 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { // publish an advertisement that tells the indexer to remove the car file // content by contextID. The indexer will then import the advertisement // and remove content. - outRemove := e.run(provider, "remove", "car", + outRemove := e.Run(provider, "remove", "car", "-i", carPath, "--listen-admin", "http://localhost:3102", ) @@ -447,7 +313,7 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { "2DrjgbFdhNiSJghFWcQbzw6E8y4jU1Z7ZsWo3dJbYxwGTNFmAj", "2DrjgbFY1BnkgZwA3oL7ijiDn7sJMf4bhhQNTtDqgZP826vGzv", } { - findOutput := e.run(ipni, "find", "--no-priv", "-i", "http://localhost:3000", "-mh", mh) + findOutput := e.Run(ipni, "find", "--no-priv", "-i", "http://localhost:3000", "-mh", mh) t.Logf("import output:\n%s\n", findOutput) if !bytes.Contains(findOutput, []byte("not found")) { return false @@ -457,24 +323,24 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { }, 10*time.Second, time.Second) // Check that status is not frozen. - outStatus := e.run(indexer, "admin", "status", "--indexer", "http://localhost:3202") + outStatus := e.Run(indexer, "admin", "status", "--indexer", "http://localhost:3202") require.Contains(t, string(outStatus), "Frozen: false", "expected indexer to be frozen") - e.run(indexer, "admin", "freeze", "--indexer", "http://localhost:3202") - outProviders = e.run(ipni, "provider", "--all", "--indexer", "http://localhost:3200") + e.Run(indexer, "admin", "freeze", "--indexer", "http://localhost:3202") + outProviders = e.Run(ipni, "provider", "--all", "--indexer", "http://localhost:3200") // Check that provider ID now appears as frozen in providers output. require.Contains(t, string(outProviders), "FrozenAtTime", "expected provider to be frozen") // Check that status is frozen. - outStatus = e.run(indexer, "admin", "status", "--indexer", "http://localhost:3202") + outStatus = e.Run(indexer, "admin", "status", "--indexer", "http://localhost:3202") require.Contains(t, string(outStatus), "Frozen: true", "expected indexer to be frozen") - e.stop(cmdIndexer2, time.Second) + e.Stop(cmdIndexer2, time.Second) - e.stop(cmdIndexer, time.Second) - e.stop(cmdProvider, time.Second) - e.stop(cmdDhstore, time.Second) + e.Stop(cmdIndexer, time.Second) + e.Stop(cmdProvider, time.Second) + e.Stop(cmdDhstore, time.Second) } func downloadFile(fileURL, filePath string) error { diff --git a/test/testindexer.go b/test/testindexer.go new file mode 100644 index 000000000..bf468f5ae --- /dev/null +++ b/test/testindexer.go @@ -0,0 +1,200 @@ +package test + +import ( + "bufio" + "bytes" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const ( + IndexerReadyMatch = "Indexer is ready" + ProviderHasPeerMatch = "connected to peer successfully" + ProviderReadyMatch = "admin http server listening" + DhstoreReady = "Store opened." +) + +// StdoutWatcher is a helper for watching the stdout of a command for a +// specific string. It is used by TestIndexerRunner to watch for specific +// output from the commands. The Signal channel will be sent on when the +// match string is found. +type StdoutWatcher struct { + Prog string + Match string + Signal chan struct{} +} + +func NewStdoutWatcher(prog, match string) StdoutWatcher { + return StdoutWatcher{ + Prog: prog, + Match: match, + Signal: make(chan struct{}, 1), + } +} + +func NewIndexerReadyWatcher() StdoutWatcher { + return NewStdoutWatcher("storetheindex", IndexerReadyMatch) +} + +func NewProviderHasPeerWatcher() StdoutWatcher { + return NewStdoutWatcher("provider", ProviderHasPeerMatch) +} + +func NewProviderReadyWatcher() StdoutWatcher { + return NewStdoutWatcher("provider", ProviderReadyMatch) +} + +func NewDhstoreReadyWatcher() StdoutWatcher { + return NewStdoutWatcher("dhstore", DhstoreReady) +} + +// TestIndexerRunner is a helper for running the indexer and other commands. +// TestIndexerRunner is not specifically tied to the indexer, but is designed +// to be used to manage multiple processes in a test; and is therefore useful +// for testing the indexer, the dhstore, and providers, all in a temporary +// directory and with a test environment. +type TestIndexerRunner struct { + t *testing.T + watchers []StdoutWatcher + + Ctx context.Context + Dir string + Env []string +} + +// NewTestIndexerRunner creates a new TestIndexerRunner for the given test, +// context, and temporary directory. It also takes a list of StdoutWatchers, +// which will be used to watch for specific output from the commands. +func NewTestIndexerRunner(t *testing.T, ctx context.Context, dir string, watchers ...StdoutWatcher) *TestIndexerRunner { + tr := TestIndexerRunner{ + t: t, + watchers: watchers, + + Ctx: ctx, + Dir: dir, + } + + // Use a clean environment, with the host's PATH, and a temporary HOME. + // We also tell "go install" to place binaries there. + hostEnv := os.Environ() + var filteredEnv []string + for _, env := range hostEnv { + if strings.Contains(env, "CC") || strings.Contains(env, "LDFLAGS") || strings.Contains(env, "CFLAGS") { + // Bring in the C compiler flags from the host. For example on a Nix + // machine, this compilation within the test will fail since the compiler + // will not find correct libraries. + filteredEnv = append(filteredEnv, env) + } else if strings.HasPrefix(env, "PATH") { + // Bring in the host's PATH. + filteredEnv = append(filteredEnv, env) + } + } + tr.Env = append(filteredEnv, []string{ + "HOME=" + tr.Dir, + "GOBIN=" + tr.Dir, + }...) + if runtime.GOOS == "windows" { + const gopath = "C:\\Projects\\Go" + err := os.MkdirAll(gopath, 0666) + require.NoError(t, err) + tr.Env = append(tr.Env, fmt.Sprintf("GOPATH=%s", gopath)) + } + t.Logf("Env: %s", strings.Join(tr.Env, " ")) + + // Reuse the host's build and module download cache. + // This should allow "go install" to reuse work. + for _, name := range []string{"GOCACHE", "GOMODCACHE"} { + out, err := exec.Command("go", "env", name).CombinedOutput() + require.NoError(t, err) + out = bytes.TrimSpace(out) + tr.Env = append(tr.Env, fmt.Sprintf("%s=%s", name, out)) + } + + return &tr +} + +// Run runs a command and returns its output. This is useful for executing +// synchronous commands within the temporary environment. +func (tr *TestIndexerRunner) Run(name string, args ...string) []byte { + tr.t.Helper() + + tr.t.Logf("run: %s %s", name, strings.Join(args, " ")) + + cmd := exec.CommandContext(tr.Ctx, name, args...) + cmd.Env = tr.Env + out, err := cmd.CombinedOutput() + require.NoError(tr.t, err, string(out)) + return out +} + +// Start starts a command and returns the command. This is useful for executing +// asynchronous commands within the temporary environment. It will watch the +// command's stdout for the given match string, and send on a watcher's +// channel when/if found. +func (tr *TestIndexerRunner) Start(prog string, args ...string) *exec.Cmd { + tr.t.Helper() + + name := filepath.Base(prog) + tr.t.Logf("run: %s %s", name, strings.Join(args, " ")) + + cmd := exec.CommandContext(tr.Ctx, prog, args...) + cmd.Env = tr.Env + + stdout, err := cmd.StdoutPipe() + require.NoError(tr.t, err) + cmd.Stderr = cmd.Stdout + + scanner := bufio.NewScanner(stdout) + + go func() { + for scanner.Scan() { + line := strings.ToLower(scanner.Text()) + + // Logging every single line via the test output is verbose, + // but helps see what's happening, especially when the test fails. + tr.t.Logf("%s: %s", name, line) + + for _, watcher := range tr.watchers { + if watcher.Prog == name && strings.Contains(line, strings.ToLower(watcher.Match)) { + watcher.Signal <- struct{}{} + } + } + } + }() + + err = cmd.Start() + require.NoError(tr.t, err) + return cmd +} + +// Stop stops a command. It sends SIGINT, and if that doesn't work, SIGKILL. +func (tr *TestIndexerRunner) Stop(cmd *exec.Cmd, timeout time.Duration) { + sig := os.Interrupt + if runtime.GOOS == "windows" { + // Windows can't send SIGINT. + sig = os.Kill + } + err := cmd.Process.Signal(sig) + require.NoError(tr.t, err) + + waitErr := make(chan error, 1) + go func() { waitErr <- cmd.Wait() }() + + select { + case <-time.After(timeout): + tr.t.Logf("killing command after %s: %s", timeout, cmd) + err = cmd.Process.Kill() + require.NoError(tr.t, err) + case err = <-waitErr: + require.NoError(tr.t, err) + } +} From d2bb69c17ad6d47b514d6854d41d280c9761ff2c Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 13 Sep 2023 16:38:34 +1000 Subject: [PATCH 2/2] fix: update TestIpniRunner api based on feedback --- e2e_test.go | 22 +++--- test/{testindexer.go => testipnirunner.go} | 82 ++++++++++++---------- 2 files changed, 55 insertions(+), 49 deletions(-) rename test/{testindexer.go => testipnirunner.go} (72%) diff --git a/e2e_test.go b/e2e_test.go index 1e43a84a9..86ca6b5bc 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -65,12 +65,7 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() - indexerReady := test.NewIndexerReadyWatcher() - providerReady := test.NewProviderReadyWatcher() - providerHasPeer := test.NewProviderHasPeerWatcher() - dhstoreReady := test.NewDhstoreReadyWatcher() - - e := test.NewTestIndexerRunner(t, ctx, t.TempDir(), indexerReady, providerReady, providerHasPeer, dhstoreReady) + e := test.NewTestIpniRunner(t, ctx, t.TempDir()) carPath := filepath.Join(e.Dir, "sample-wrapped-v2.car") err := downloadFile("https://github.com/ipni/index-provider/raw/main/testdata/sample-wrapped-v2.car", carPath) @@ -145,7 +140,9 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { cfg.Save(stiCfgPath) // start provider - cmdProvider := e.Start(provider, "daemon") + providerReady := test.NewStdoutWatcher(test.ProviderReadyMatch) + providerHasPeer := test.NewStdoutWatcher(test.ProviderHasPeerMatch) + cmdProvider := e.Start(test.NewExecution(provider, "daemon").WithWatcher(providerReady).WithWatcher(providerHasPeer)) select { case <-providerReady.Signal: case <-ctx.Done(): @@ -153,7 +150,8 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { } // start dhstore - cmdDhstore := e.Start(dhstore, "--storePath", e.Dir) + dhstoreReady := test.NewStdoutWatcher(test.DhstoreReady) + cmdDhstore := e.Start(test.NewExecution(dhstore, "--storePath", e.Dir).WithWatcher(dhstoreReady)) select { case <-dhstoreReady.Signal: case <-ctx.Done(): @@ -161,7 +159,8 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { } // start indexer - cmdIndexer := e.Start(indexer, "daemon") + indexerReady := test.NewStdoutWatcher(test.IndexerReadyMatch) + cmdIndexer := e.Start(test.NewExecution(indexer, "daemon").WithWatcher(indexerReady)) select { case <-indexerReady.Signal: case <-ctx.Done(): @@ -242,9 +241,10 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { require.NoError(t, err) indexer2ID := cfg.Identity.PeerID - cmdIndexer2 := e.Start(indexer, "daemon") + indexerReady2 := test.NewStdoutWatcher(test.IndexerReadyMatch) + cmdIndexer2 := e.Start(test.NewExecution(indexer, "daemon").WithWatcher(indexerReady2)) select { - case <-indexerReady.Signal: + case <-indexerReady2.Signal: case <-ctx.Done(): t.Fatal("timed out waiting for indexer2 to start") } diff --git a/test/testindexer.go b/test/testipnirunner.go similarity index 72% rename from test/testindexer.go rename to test/testipnirunner.go index bf468f5ae..1afc0ae0b 100644 --- a/test/testindexer.go +++ b/test/testipnirunner.go @@ -24,60 +24,40 @@ const ( ) // StdoutWatcher is a helper for watching the stdout of a command for a -// specific string. It is used by TestIndexerRunner to watch for specific +// specific string. It is used by TestIpniRunner to watch for specific // output from the commands. The Signal channel will be sent on when the // match string is found. type StdoutWatcher struct { - Prog string Match string Signal chan struct{} } -func NewStdoutWatcher(prog, match string) StdoutWatcher { +func NewStdoutWatcher(match string) StdoutWatcher { return StdoutWatcher{ - Prog: prog, Match: match, Signal: make(chan struct{}, 1), } } -func NewIndexerReadyWatcher() StdoutWatcher { - return NewStdoutWatcher("storetheindex", IndexerReadyMatch) -} - -func NewProviderHasPeerWatcher() StdoutWatcher { - return NewStdoutWatcher("provider", ProviderHasPeerMatch) -} - -func NewProviderReadyWatcher() StdoutWatcher { - return NewStdoutWatcher("provider", ProviderReadyMatch) -} - -func NewDhstoreReadyWatcher() StdoutWatcher { - return NewStdoutWatcher("dhstore", DhstoreReady) -} - -// TestIndexerRunner is a helper for running the indexer and other commands. -// TestIndexerRunner is not specifically tied to the indexer, but is designed +// TestIpniRunner is a helper for running the indexer and other commands. +// TestIpniRunner is not specifically tied to the indexer, but is designed // to be used to manage multiple processes in a test; and is therefore useful // for testing the indexer, the dhstore, and providers, all in a temporary // directory and with a test environment. -type TestIndexerRunner struct { - t *testing.T - watchers []StdoutWatcher +type TestIpniRunner struct { + t *testing.T Ctx context.Context Dir string Env []string } -// NewTestIndexerRunner creates a new TestIndexerRunner for the given test, +// NewTestIpniRunner creates a new TestIpniRunner for the given test, // context, and temporary directory. It also takes a list of StdoutWatchers, // which will be used to watch for specific output from the commands. -func NewTestIndexerRunner(t *testing.T, ctx context.Context, dir string, watchers ...StdoutWatcher) *TestIndexerRunner { - tr := TestIndexerRunner{ - t: t, - watchers: watchers, +func NewTestIpniRunner(t *testing.T, ctx context.Context, dir string) *TestIpniRunner { + tr := TestIpniRunner{ + t: t, Ctx: ctx, Dir: dir, @@ -124,7 +104,7 @@ func NewTestIndexerRunner(t *testing.T, ctx context.Context, dir string, watcher // Run runs a command and returns its output. This is useful for executing // synchronous commands within the temporary environment. -func (tr *TestIndexerRunner) Run(name string, args ...string) []byte { +func (tr *TestIpniRunner) Run(name string, args ...string) []byte { tr.t.Helper() tr.t.Logf("run: %s %s", name, strings.Join(args, " ")) @@ -136,17 +116,40 @@ func (tr *TestIndexerRunner) Run(name string, args ...string) []byte { return out } +type Execution struct { + Name string + Args []string + Watchers []StdoutWatcher +} + +func NewExecution(name string, args ...string) Execution { + return Execution{ + Name: name, + Args: args, + Watchers: []StdoutWatcher{}, + } +} + +func (p Execution) String() string { + return p.Name + " " + strings.Join(p.Args, " ") +} + +func (p Execution) WithWatcher(watcher StdoutWatcher) Execution { + p.Watchers = append(append([]StdoutWatcher{}, p.Watchers...), watcher) + return p +} + // Start starts a command and returns the command. This is useful for executing // asynchronous commands within the temporary environment. It will watch the // command's stdout for the given match string, and send on a watcher's // channel when/if found. -func (tr *TestIndexerRunner) Start(prog string, args ...string) *exec.Cmd { +func (tr *TestIpniRunner) Start(ex Execution) *exec.Cmd { tr.t.Helper() - name := filepath.Base(prog) - tr.t.Logf("run: %s %s", name, strings.Join(args, " ")) + name := filepath.Base(ex.Name) + tr.t.Logf("run: %s", ex.String()) - cmd := exec.CommandContext(tr.Ctx, prog, args...) + cmd := exec.CommandContext(tr.Ctx, ex.Name, ex.Args...) cmd.Env = tr.Env stdout, err := cmd.StdoutPipe() @@ -155,6 +158,9 @@ func (tr *TestIndexerRunner) Start(prog string, args ...string) *exec.Cmd { scanner := bufio.NewScanner(stdout) + for _, watcher := range ex.Watchers { + tr.t.Logf("watching: %s for [%s]", name, watcher.Match) + } go func() { for scanner.Scan() { line := strings.ToLower(scanner.Text()) @@ -163,8 +169,8 @@ func (tr *TestIndexerRunner) Start(prog string, args ...string) *exec.Cmd { // but helps see what's happening, especially when the test fails. tr.t.Logf("%s: %s", name, line) - for _, watcher := range tr.watchers { - if watcher.Prog == name && strings.Contains(line, strings.ToLower(watcher.Match)) { + for _, watcher := range ex.Watchers { + if strings.Contains(line, strings.ToLower(watcher.Match)) { watcher.Signal <- struct{}{} } } @@ -177,7 +183,7 @@ func (tr *TestIndexerRunner) Start(prog string, args ...string) *exec.Cmd { } // Stop stops a command. It sends SIGINT, and if that doesn't work, SIGKILL. -func (tr *TestIndexerRunner) Stop(cmd *exec.Cmd, timeout time.Duration) { +func (tr *TestIpniRunner) Stop(cmd *exec.Cmd, timeout time.Duration) { sig := os.Interrupt if runtime.GOOS == "windows" { // Windows can't send SIGINT.