Skip to content

Commit

Permalink
Merge branch 'master' into rhall-worker-pool-iter
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhall07 authored Mar 1, 2021
2 parents c08b57a + 73bac90 commit b7ab449
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 57 deletions.
37 changes: 26 additions & 11 deletions src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
persistfs "github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
bfs "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/peers"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/uninitialized"
Expand Down Expand Up @@ -118,16 +120,17 @@ func newMultiAddrAdminClient(

// BootstrappableTestSetupOptions defines options for test setups.
type BootstrappableTestSetupOptions struct {
FinalBootstrapper string
BootstrapBlocksBatchSize int
BootstrapBlocksConcurrency int
BootstrapConsistencyLevel topology.ReadConsistencyLevel
TopologyInitializer topology.Initializer
TestStatsReporter xmetrics.TestStatsReporter
DisablePeersBootstrapper bool
UseTChannelClientForWriting bool
EnableRepairs bool
AdminClientCustomOpts []client.CustomAdminOption
FinalBootstrapper string
BootstrapBlocksBatchSize int
BootstrapBlocksConcurrency int
BootstrapConsistencyLevel topology.ReadConsistencyLevel
TopologyInitializer topology.Initializer
TestStatsReporter xmetrics.TestStatsReporter
DisableCommitLogBootstrapper bool
DisablePeersBootstrapper bool
UseTChannelClientForWriting bool
EnableRepairs bool
AdminClientCustomOpts []client.CustomAdminOption
}

type closeFn func()
Expand Down Expand Up @@ -166,6 +169,7 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo
for i := 0; i < replicas; i++ {
var (
instance = i
usingCommitLogBootstrapper = !setupOpts[i].DisableCommitLogBootstrapper
usingPeersBootstrapper = !setupOpts[i].DisablePeersBootstrapper
finalBootstrapperToUse = setupOpts[i].FinalBootstrapper
useTChannelClientForWriting = setupOpts[i].UseTChannelClientForWriting
Expand Down Expand Up @@ -256,7 +260,7 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo
case bootstrapper.NoOpNoneBootstrapperName:
finalBootstrapper = bootstrapper.NewNoOpNoneBootstrapperProvider()
case uninitialized.UninitializedTopologyBootstrapperName:
uninitialized.NewUninitializedTopologyBootstrapperProvider(
finalBootstrapper = uninitialized.NewUninitializedTopologyBootstrapperProvider(
uninitialized.NewOptions().
SetInstrumentOptions(instrumentOpts), nil)
default:
Expand Down Expand Up @@ -303,6 +307,17 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo
require.NoError(t, err)
}

if usingCommitLogBootstrapper {
bootstrapCommitlogOpts := commitlog.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(setup.StorageOpts().CommitLogOptions()).
SetRuntimeOptionsManager(runtime.NewOptionsManager())

finalBootstrapper, err = commitlog.NewCommitLogBootstrapperProvider(bootstrapCommitlogOpts,
mustInspectFilesystem(fsOpts), finalBootstrapper)
require.NoError(t, err)
}

persistMgr, err := persistfs.NewPersistManager(fsOpts)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ func testPeersBootstrapHighConcurrency(
DisablePeersBootstrapper: true,
},
{
DisablePeersBootstrapper: false,
BootstrapBlocksBatchSize: batchSize,
BootstrapBlocksConcurrency: concurrency,
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
BootstrapBlocksBatchSize: batchSize,
BootstrapBlocksConcurrency: concurrency,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func TestPeersBootstrapIndexAggregateQuery(t *testing.T) {

setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: false},
{
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/integration/peers_bootstrap_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {

setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: false},
{
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()
Expand Down
7 changes: 4 additions & 3 deletions src/dbnode/integration/peers_bootstrap_merge_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ func testPeersBootstrapMergeLocal(t *testing.T, setTestOpts setTestOptions, upda
UseTChannelClientForWriting: true,
},
{
DisablePeersBootstrapper: false,
UseTChannelClientForWriting: true,
TestStatsReporter: reporter,
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
UseTChannelClientForWriting: true,
TestStatsReporter: reporter,
},
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func testPeersBootstrapMergePeerBlocks(t *testing.T, setTestOpts setTestOptions,
setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: false},
{
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/integration/peers_bootstrap_node_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func TestPeersBootstrapNodeDown(t *testing.T) {
setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: false},
{
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()
Expand Down
33 changes: 11 additions & 22 deletions src/dbnode/integration/peers_bootstrap_none_available_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
package integration

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/uninitialized"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/topology/testutil"
xtest "github.com/m3db/m3/src/x/test"
Expand Down Expand Up @@ -94,37 +94,26 @@ func TestPeersBootstrapNoneAvailable(t *testing.T) {
{
DisablePeersBootstrapper: false,
TopologyInitializer: topoInit,
FinalBootstrapper: bootstrapper.NoOpAllBootstrapperName,
FinalBootstrapper: uninitialized.UninitializedTopologyBootstrapperName,
},
{
DisablePeersBootstrapper: false,
TopologyInitializer: topoInit,
FinalBootstrapper: bootstrapper.NoOpAllBootstrapperName,
FinalBootstrapper: uninitialized.UninitializedTopologyBootstrapperName,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()

serversAreUp := &sync.WaitGroup{}
serversAreUp.Add(2)

// Start both servers "simultaneously"
go func() {
if err := setups[0].StartServer(); err != nil {
panic(err)
}
serversAreUp.Done()
}()
go func() {
if err := setups[1].StartServer(); err != nil {
panic(err)
}
serversAreUp.Done()
}()

serversAreUp.Wait()
setups.parallel(func(s TestSetup) {
s.StartServer()
})
log.Debug("servers are now up")

for i, s := range setups {
assert.True(t, s.ServerIsBootstrapped(), "setups[%v] should be bootstrapped", i)
}

// Stop the servers
defer func() {
setups.parallel(func(s TestSetup) {
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/integration/peers_bootstrap_partial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func TestPeersBootstrapPartialData(t *testing.T) {

setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: false},
{
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/integration/peers_bootstrap_select_best_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func TestPeersBootstrapSelectBest(t *testing.T) {
setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: false},
{
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/integration/peers_bootstrap_simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func testPeersBootstrapSimple(t *testing.T, setTestOpts setTestOptions, updateIn

setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: true},
{DisablePeersBootstrapper: false},
{
DisableCommitLogBootstrapper: true,
DisablePeersBootstrapper: false,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()
Expand Down
70 changes: 59 additions & 11 deletions src/dbnode/integration/peers_bootstrap_single_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,73 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/uninitialized"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/topology/testutil"
xtest "github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/require"
)

// TestPeersBootstrapSingleNode makes sure that we can include the peer bootstrapper
// in a single-node topology without causing a bootstrap failure or infinite hang.
func TestPeersBootstrapSingleNode(t *testing.T) {
// TestPeersBootstrapSingleNodeUninitialized makes sure that we can include the peer bootstrapper
// in a single-node topology of a non-initialized cluster without causing a bootstrap failure or infinite hang.
func TestPeersBootstrapSingleNodeUninitialized(t *testing.T) {
opts := NewTestOptions(t)

// Define a topology with initializing shards
minShard := uint32(0)
maxShard := uint32(opts.NumShards()) - uint32(1)
instances := []services.ServiceInstance{
node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Initializing)),
}

hostShardSets := []topology.HostShardSet{}
for _, instance := range instances {
h, err := topology.NewHostShardSetFromServiceInstance(instance, sharding.DefaultHashFn(opts.NumShards()))
require.NoError(t, err)
hostShardSets = append(hostShardSets, h)
}

shards := testutil.ShardsRange(minShard, maxShard, shard.Initializing)
shardSet, err := sharding.NewShardSet(
shards,
sharding.DefaultHashFn(int(maxShard)),
)
require.NoError(t, err)

topoOpts := topology.NewStaticOptions().
SetReplicas(len(instances)).
SetHostShardSets(hostShardSets).
SetShardSet(shardSet)
topoInit := topology.NewStaticInitializer(topoOpts)

setupOpts := []BootstrappableTestSetupOptions{
{
DisablePeersBootstrapper: false,
TopologyInitializer: topoInit,
// This will bootstrap w/ unfulfilled ranges.
FinalBootstrapper: uninitialized.UninitializedTopologyBootstrapperName,
},
}
testPeersBootstrapSingleNode(t, setupOpts)
}

// TestPeersBootstrapSingleNodeInitialized makes sure that we can include the peer bootstrapper
// in a single-node topology of already initialized cluster without causing a bootstrap failure or infinite hang.
func TestPeersBootstrapSingleNodeInitialized(t *testing.T) {
setupOpts := []BootstrappableTestSetupOptions{
{DisablePeersBootstrapper: false},
}
testPeersBootstrapSingleNode(t, setupOpts)
}

func testPeersBootstrapSingleNode(t *testing.T, setupOpts []BootstrappableTestSetupOptions) {
if testing.Short() {
t.SkipNow()
}
Expand All @@ -58,13 +113,6 @@ func TestPeersBootstrapSingleNode(t *testing.T) {
SetUseTChannelClientForWriting(true).
SetUseTChannelClientForReading(true)

setupOpts := []BootstrappableTestSetupOptions{
{
DisablePeersBootstrapper: false,
// This will bootstrap w/ unfulfilled ranges.
FinalBootstrapper: bootstrapper.NoOpAllBootstrapperName,
},
}
setups, closeFn := NewDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()

Expand Down

0 comments on commit b7ab449

Please sign in to comment.