Skip to content

Commit

Permalink
[dbnode] Add server.StorageOptions to TestSetup (#3023)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored and vpranckaitis committed Dec 17, 2020
1 parent 81292d5 commit a8dce25
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
8 changes: 8 additions & 0 deletions src/dbnode/integration/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/m3db/m3/src/dbnode/network/server/tchannelthrift"
ttcluster "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/cluster"
ttnode "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/node"
"github.com/m3db/m3/src/dbnode/server"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/topology"
Expand Down Expand Up @@ -95,6 +96,7 @@ func openAndServe(
db storage.Database,
client client.Client,
opts storage.Options,
serverStorageOpts server.StorageOptions,
doneCh <-chan struct{},
) error {
logger := opts.InstrumentOptions().Logger()
Expand All @@ -106,6 +108,12 @@ func openAndServe(
ttopts := tchannelthrift.NewOptions()
service := ttnode.NewService(db, ttopts)
nodeOpts := ttnode.NewOptions(nil)
if fn := serverStorageOpts.TChanChannelFn; fn != nil {
nodeOpts = nodeOpts.SetTChanChannelFn(fn)
}
if fn := serverStorageOpts.TChanNodeServerFn; fn != nil {
nodeOpts = nodeOpts.SetTChanNodeServerFn(fn)
}
nativeNodeClose, err := ttnode.NewServer(service, tchannelNodeAddr, contextPool, nodeOpts).ListenAndServe()
if err != nil {
return fmt.Errorf("could not open tchannelthrift interface %s: %v", tchannelNodeAddr, err)
Expand Down
44 changes: 23 additions & 21 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/server"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/block"
Expand All @@ -61,7 +62,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand All @@ -80,12 +81,8 @@ var (

testSchemaHistory = prototest.NewSchemaHistory()
testSchema = prototest.NewMessageDescriptor(testSchemaHistory)
testSchemaDesc = namespace.GetTestSchemaDescr(testSchema)
testProtoMessages = prototest.NewProtoTestMessages(testSchema)
testProtoEqual = func(expect, actual []byte) bool {
return prototest.ProtoEqual(testSchema, expect, actual)
}
testProtoIter = prototest.NewProtoMessageIterator(testProtoMessages)
testProtoIter = prototest.NewProtoMessageIterator(testProtoMessages)
)

// nowSetterFn is the function that sets the current time
Expand All @@ -105,6 +102,7 @@ type testSetup struct {

db cluster.Database
storageOpts storage.Options
serverStorageOpts server.StorageOptions
fsOpts fs.Options
blockLeaseManager block.LeaseManager
hostID string
Expand Down Expand Up @@ -133,8 +131,10 @@ type testSetup struct {
namespaces []namespace.Metadata

// signals
doneCh chan struct{}
closedCh chan struct{}
doneCh chan struct {
}
closedCh chan struct {
}
}

// TestSetup is a test setup.
Expand All @@ -157,6 +157,7 @@ type TestSetup interface {
FilePathPrefix() string
StorageOpts() storage.Options
SetStorageOpts(storage.Options)
SetServerStorageOpts(server.StorageOptions)
Origin() topology.Host
ServerIsBootstrapped() bool
StopServer() error
Expand Down Expand Up @@ -605,6 +606,10 @@ func (ts *testSetup) SetStorageOpts(opts storage.Options) {
ts.storageOpts = opts
}

func (ts *testSetup) SetServerStorageOpts(opts server.StorageOptions) {
ts.serverStorageOpts = opts
}

func (ts *testSetup) TopologyInitializer() topology.Initializer {
return ts.topoInit
}
Expand Down Expand Up @@ -732,7 +737,7 @@ func (ts *testSetup) startServerBase(waitForBootstrap bool) error {
if err := openAndServe(
ts.httpClusterAddr(), ts.tchannelClusterAddr(),
ts.httpNodeAddr(), ts.tchannelNodeAddr(), ts.httpDebugAddr(),
ts.db, ts.m3dbClient, ts.storageOpts, ts.doneCh,
ts.db, ts.m3dbClient, ts.storageOpts, ts.serverStorageOpts, ts.doneCh,
); err != nil {
select {
case resultCh <- err:
Expand Down Expand Up @@ -1004,22 +1009,19 @@ func newClients(
tchannelNodeAddr string,
) (client.AdminClient, client.AdminClient, error) {
var (
clientOpts = defaultClientOptions(topoInit).
SetClusterConnectTimeout(opts.ClusterConnectionTimeout()).
SetFetchRequestTimeout(opts.FetchRequestTimeout()).
SetWriteConsistencyLevel(opts.WriteConsistencyLevel()).
SetTopologyInitializer(topoInit).
SetUseV2BatchAPIs(true)
clientOpts = defaultClientOptions(topoInit).SetClusterConnectTimeout(
opts.ClusterConnectionTimeout()).
SetFetchRequestTimeout(opts.FetchRequestTimeout()).
SetWriteConsistencyLevel(opts.WriteConsistencyLevel()).
SetTopologyInitializer(topoInit).
SetUseV2BatchAPIs(true)

origin = newOrigin(id, tchannelNodeAddr)
verificationOrigin = newOrigin(id+"-verification", tchannelNodeAddr)

adminOpts = clientOpts.(client.AdminOptions).
SetOrigin(origin).
SetSchemaRegistry(schemaReg)
verificationAdminOpts = adminOpts.
SetOrigin(verificationOrigin).
SetSchemaRegistry(schemaReg)
adminOpts = clientOpts.(client.AdminOptions).SetOrigin(origin).SetSchemaRegistry(schemaReg)

verificationAdminOpts = adminOpts.SetOrigin(verificationOrigin).SetSchemaRegistry(schemaReg)
)

if opts.ProtoEncoding() {
Expand Down

0 comments on commit a8dce25

Please sign in to comment.