Skip to content

Commit

Permalink
Randomize test ports
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Nov 30, 2024
1 parent 7c9e7c1 commit 9f3131c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 81 deletions.
3 changes: 1 addition & 2 deletions common/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,8 @@ func (d *RPCFactory) GetGRPCListener() net.Listener {

func (d *RPCFactory) createGRPCListener() net.Listener {
hostAddress := net.JoinHostPort(getListenIP(d.config, d.logger).String(), convert.IntToString(d.config.GRPCPort))
var err error
grpcListener, err := net.Listen("tcp", hostAddress)

grpcListener, err := net.Listen("tcp", hostAddress)
if err != nil || grpcListener == nil || grpcListener.Addr() == nil {
d.logger.Fatal("Failed to start gRPC listener", tag.Error(err), tag.Service(d.serviceName), tag.Address(hostAddress))
}
Expand Down
104 changes: 30 additions & 74 deletions tests/testcore/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,6 @@ import (
"google.golang.org/grpc"
)

const (
frontendPort = 7134
frontendHTTPPort = 7144
historyPort = 7132
matchingPort = 7136
workerPort = 7138 // not really listening
)

type (
TemporalImpl struct {
fxApps []*fx.App
Expand Down Expand Up @@ -118,7 +110,6 @@ type (
namespaceReplicationQueue persistence.NamespaceReplicationQueue
abstractDataStoreFactory persistenceClient.AbstractDataStoreFactory
visibilityStoreFactory visibility.VisibilityStoreFactory
clusterNo int // cluster number
archiverMetadata carchiver.ArchivalMetadata
archiverProvider provider.ArchiverProvider
frontendConfig FrontendConfig
Expand All @@ -132,7 +123,7 @@ type (
spanExporters []otelsdktrace.SpanExporter
tlsConfigProvider *encryption.FixedTLSConfigProvider
captureMetricsHandler *metricstest.CaptureHandler
hostsByService map[primitives.ServiceName]static.Hosts
hostsByService map[string]map[primitives.ServiceName]static.Hosts

onGetClaims func(*authorization.AuthInfo) (*authorization.Claims, error)
onAuthorize func(context.Context, *authorization.Claims, *authorization.CallTarget) (authorization.Result, error)
Expand Down Expand Up @@ -186,7 +177,6 @@ type (
AbstractDataStoreFactory persistenceClient.AbstractDataStoreFactory
VisibilityStoreFactory visibility.VisibilityStoreFactory
Logger log.Logger
ClusterNo int
ArchiverMetadata carchiver.ArchivalMetadata
ArchiverProvider provider.ArchiverProvider
EnableReadHistoryFromArchival bool
Expand All @@ -205,6 +195,7 @@ type (
// ServiceFxOptions is populated by WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
TaskCategoryRegistry tasks.TaskCategoryRegistry
HostsByService map[string]map[primitives.ServiceName]static.Hosts
}

listenHostPort string
Expand Down Expand Up @@ -253,7 +244,6 @@ func newTemporal(t *testing.T, params *TemporalParams) *TemporalImpl {
namespaceReplicationQueue: params.NamespaceReplicationQueue,
abstractDataStoreFactory: params.AbstractDataStoreFactory,
visibilityStoreFactory: params.VisibilityStoreFactory,
clusterNo: params.ClusterNo,
esConfig: params.ESConfig,
esClient: params.ESClient,
archiverMetadata: params.ArchiverMetadata,
Expand All @@ -270,23 +260,7 @@ func newTemporal(t *testing.T, params *TemporalParams) *TemporalImpl {
dcClient: dynamicconfig.NewMemoryClient(),
serviceFxOptions: params.ServiceFxOptions,
taskCategoryRegistry: params.TaskCategoryRegistry,
}

// set defaults
const minNodes = 1
impl.frontendConfig.NumFrontendHosts = max(minNodes, impl.frontendConfig.NumFrontendHosts)
impl.historyConfig.NumHistoryHosts = max(minNodes, impl.historyConfig.NumHistoryHosts)
impl.matchingConfig.NumMatchingHosts = max(minNodes, impl.matchingConfig.NumMatchingHosts)
impl.workerConfig.NumWorkers = max(minNodes, impl.workerConfig.NumWorkers)
if impl.workerConfig.DisableWorker {
impl.workerConfig.NumWorkers = 0
}

impl.hostsByService = map[primitives.ServiceName]static.Hosts{
primitives.FrontendService: static.Hosts{All: impl.FrontendGRPCAddresses()},
primitives.MatchingService: static.Hosts{All: impl.MatchingServiceAddresses()},
primitives.HistoryService: static.Hosts{All: impl.HistoryServiceAddresses()},
primitives.WorkerService: static.Hosts{All: impl.WorkerServiceAddresses()},
hostsByService: params.HostsByService,
}

for k, v := range staticOverrides {
Expand Down Expand Up @@ -329,56 +303,26 @@ func (c *TemporalImpl) Stop() error {
}

func (c *TemporalImpl) makeHostMap(serviceName primitives.ServiceName, self string) map[primitives.ServiceName]static.Hosts {
hostMap := maps.Clone(c.hostsByService)
hostMap := maps.Clone(c.hostsByService["grpc"])
hosts := hostMap[serviceName]
hosts.Self = self
hostMap[serviceName] = hosts
return hostMap
}

func (c *TemporalImpl) makeGRPCAddresses(num, port int) []string {
hosts := make([]string, num)
for i := range hosts {
hosts[i] = fmt.Sprintf("127.0.%d.%d:%d", c.clusterNo, i+1, port)
}
return hosts
}

func (c *TemporalImpl) FrontendGRPCAddresses() []string {
return c.makeGRPCAddresses(c.frontendConfig.NumFrontendHosts, frontendPort)
}

// Use this to get an address for the Go SDK to connect to.
func (c *TemporalImpl) FrontendGRPCAddress() string {
return c.frontendMembershipAddress
}

// Use this to get an address for a remote cluster to connect to.
func (c *TemporalImpl) RemoteFrontendGRPCAddress() string {
return c.FrontendGRPCAddresses()[0]
return c.hostsByService["grpc"][primitives.FrontendService].All[0]
}

func (c *TemporalImpl) FrontendHTTPAddress() string {
// randomize like a load balancer would
addrs := c.FrontendGRPCAddresses()
addr := addrs[rand.Intn(len(addrs))]
host, _, err := net.SplitHostPort(addr)
if err != nil {
panic(fmt.Errorf("Invalid gRPC frontend address: %w", err))
}
return net.JoinHostPort(host, strconv.Itoa(frontendHTTPPort))
}

func (c *TemporalImpl) HistoryServiceAddresses() []string {
return c.makeGRPCAddresses(c.historyConfig.NumHistoryHosts, historyPort)
addrs := c.hostsByService["http"][primitives.FrontendService].All
return addrs[rand.Intn(len(addrs))]
}

func (c *TemporalImpl) MatchingServiceAddresses() []string {
return c.makeGRPCAddresses(c.matchingConfig.NumMatchingHosts, matchingPort)
}

func (c *TemporalImpl) WorkerServiceAddresses() []string {
return c.makeGRPCAddresses(c.workerConfig.NumWorkers, workerPort)
func (c *TemporalImpl) FrontendGRPCAddress() string {
return c.hostsByService["grpc"][primitives.FrontendService].All[0]
}

func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient {
Expand Down Expand Up @@ -430,7 +374,7 @@ func (c *TemporalImpl) startFrontend() {
var matchingRawClient resource.MatchingRawClient
var grpcResolver *membership.GRPCResolver

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByService["grpc"][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
var namespaceRegistry namespace.Registry
app := fx.New(
Expand All @@ -441,7 +385,7 @@ func (c *TemporalImpl) startFrontend() {
),
fx.Provide(c.frontendConfigProvider),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() config.DCRedirectionPolicy { return config.DCRedirectionPolicy{} }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
Expand Down Expand Up @@ -508,7 +452,7 @@ func (c *TemporalImpl) startFrontend() {
func (c *TemporalImpl) startHistory() {
serviceName := primitives.HistoryService

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByService["grpc"][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
app := fx.New(
fx.Supply(
Expand All @@ -518,7 +462,7 @@ func (c *TemporalImpl) startHistory() {
),
fx.Provide(c.GetMetricsHandler),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() config.DCRedirectionPolicy { return config.DCRedirectionPolicy{} }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
Expand Down Expand Up @@ -565,7 +509,7 @@ func (c *TemporalImpl) startHistory() {
func (c *TemporalImpl) startMatching() {
serviceName := primitives.MatchingService

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByService["grpc"][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
app := fx.New(
fx.Supply(
Expand All @@ -575,7 +519,7 @@ func (c *TemporalImpl) startMatching() {
),
fx.Provide(c.GetMetricsHandler),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
fx.Provide(c.newRPCFactory),
Expand Down Expand Up @@ -626,7 +570,7 @@ func (c *TemporalImpl) startWorker() {
clusterConfigCopy.EnableGlobalNamespace = true
}

for _, host := range c.hostsByService[serviceName].All {
for _, host := range c.hostsByService["grpc"][serviceName].All {
logger := log.With(c.logger, tag.Host(host))
app := fx.New(
fx.Supply(
Expand All @@ -636,7 +580,7 @@ func (c *TemporalImpl) startWorker() {
),
fx.Provide(c.GetMetricsHandler),
fx.Provide(func() listenHostPort { return listenHostPort(host) }),
fx.Provide(func() httpPort { return httpPort(frontendHTTPPort) }),
fx.Provide(func() httpPort { return portFromAddress(c.FrontendHTTPAddress()) }),
fx.Provide(func() config.DCRedirectionPolicy { return config.DCRedirectionPolicy{} }),
fx.Provide(func() log.Logger { return logger }),
fx.Provide(func() log.ThrottledLogger { return logger }),
Expand Down Expand Up @@ -726,7 +670,7 @@ func (c *TemporalImpl) frontendConfigProvider() *config.Config {
Services: map[string]config.Service{
string(primitives.FrontendService): {
RPC: config.RPC{
HTTPPort: frontendHTTPPort,
HTTPPort: int(portFromAddress(c.FrontendHTTPAddress())),
HTTPAdditionalForwardedHeaders: []string{
"this-header-forwarded",
"this-header-prefix-forwarded-*",
Expand Down Expand Up @@ -949,3 +893,15 @@ func (c *TemporalImpl) overrideDynamicConfig(t *testing.T, name dynamicconfig.Ke
t.Cleanup(cleanup)
return cleanup
}

func portFromAddress(addr string) httpPort {
_, port, err := net.SplitHostPort(addr)
if err != nil {
panic(fmt.Errorf("Invalid address: %w", err))
}
portInt, err := strconv.Atoi(port)
if err != nil {
panic(fmt.Errorf("Cannot parse port: %w", err))
}
return httpPort(portInt)
}
45 changes: 40 additions & 5 deletions tests/testcore/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership/static"
"go.temporal.io/server/common/metrics/metricstest"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
Expand All @@ -62,6 +63,7 @@ import (
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/rpc/encryption"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/internal/temporalite"
"go.temporal.io/server/temporal"
"go.temporal.io/server/tests/testutils"
"go.uber.org/fx"
Expand Down Expand Up @@ -192,13 +194,38 @@ func (f *defaultPersistenceTestBaseFactory) NewTestBase(options *persistencetest
func NewClusterWithPersistenceTestBaseFactory(t *testing.T, options *TestClusterConfig, logger log.Logger, tbFactory PersistenceTestBaseFactory) (*TestCluster, error) {
clusterNo := GetFreeClusterNumber()

// determine number of hosts per service
const minNodes = 1
options.FrontendConfig.NumFrontendHosts = max(minNodes, options.FrontendConfig.NumFrontendHosts)
options.HistoryConfig.NumHistoryHosts = max(minNodes, options.HistoryConfig.NumHistoryHosts)
options.MatchingConfig.NumMatchingHosts = max(minNodes, options.MatchingConfig.NumMatchingHosts)
options.WorkerConfig.NumWorkers = max(minNodes, options.WorkerConfig.NumWorkers)
if options.WorkerConfig.DisableWorker {
options.WorkerConfig.NumWorkers = 0
}

// allocate ports
pp := temporalite.NewPortProvider()
hostsByService := map[string]map[primitives.ServiceName]static.Hosts{
"grpc": map[primitives.ServiceName]static.Hosts{
primitives.FrontendService: static.Hosts{All: makeAddresses(pp, options.FrontendConfig.NumFrontendHosts)},
primitives.MatchingService: static.Hosts{All: makeAddresses(pp, options.MatchingConfig.NumMatchingHosts)},
primitives.HistoryService: static.Hosts{All: makeAddresses(pp, options.HistoryConfig.NumHistoryHosts)},
primitives.WorkerService: static.Hosts{All: makeAddresses(pp, options.WorkerConfig.NumWorkers)},
},
"http": map[primitives.ServiceName]static.Hosts{
primitives.FrontendService: static.Hosts{All: makeAddresses(pp, options.FrontendConfig.NumFrontendHosts)},
},
}
if err := pp.Close(); err != nil {
logger.Fatal("unable to close port provider listeners", tag.Error(err))
}

if len(options.ClusterMetadata.ClusterInformation) > 0 {
// set self-address for current cluster
// TODO: remove duplication between this and makeGRPCAddresses. we don't have a TemporalImpl
// yet so we can't just call that.
ci := options.ClusterMetadata.ClusterInformation[options.ClusterMetadata.CurrentClusterName]
ci.RPCAddress = fmt.Sprintf("127.0.%d.%d:%d", clusterNo, 1, frontendPort)
ci.HTTPAddress = fmt.Sprintf("127.0.%d.%d:%d", clusterNo, 1, frontendHTTPPort)
ci.RPCAddress = fmt.Sprintf("127.0.%d.%d:%d", clusterNo, 1, hostsByService["grpc"][primitives.FrontendService].All[0])

Check failure on line 227 in tests/testcore/test_cluster.go

View workflow job for this annotation

GitHub Actions / golangci

printf: fmt.Sprintf format %d has arg hostsByService["grpc"][primitives.FrontendService].All[0] of wrong type string (govet)
ci.HTTPAddress = fmt.Sprintf("127.0.%d.%d:%d", clusterNo, 1, hostsByService["http"][primitives.FrontendService].All[0])

Check failure on line 228 in tests/testcore/test_cluster.go

View workflow job for this annotation

GitHub Actions / golangci

printf: fmt.Sprintf format %d has arg hostsByService["http"][primitives.FrontendService].All[0] of wrong type string (govet)
options.ClusterMetadata.ClusterInformation[options.ClusterMetadata.CurrentClusterName] = ci
}

Expand Down Expand Up @@ -313,7 +340,6 @@ func NewClusterWithPersistenceTestBaseFactory(t *testing.T, options *TestCluster
VisibilityStoreFactory: testBase.VisibilityStoreFactory,
TaskMgr: testBase.TaskMgr,
Logger: logger,
ClusterNo: clusterNo,
ESConfig: options.ESConfig,
ESClient: esClient,
ArchiverMetadata: archiverBase.metadata,
Expand All @@ -328,6 +354,7 @@ func NewClusterWithPersistenceTestBaseFactory(t *testing.T, options *TestCluster
TLSConfigProvider: tlsConfigProvider,
ServiceFxOptions: options.ServiceFxOptions,
TaskCategoryRegistry: taskCategoryRegistry,
HostsByService: hostsByService,
}

if options.EnableMetricsCapture {
Expand Down Expand Up @@ -624,3 +651,11 @@ func createFixedTLSConfigProvider() (*encryption.FixedTLSConfigProvider, error)
FrontendClientConfig: clientTLSConfig,
}, nil
}

func makeAddresses(pp *temporalite.PortProvider, count int) []string {
hosts := make([]string, count)
for i := range hosts {
hosts[i] = fmt.Sprintf("127.0.0.1:%d", pp.MustGetFreePort())
}
return hosts
}

0 comments on commit 9f3131c

Please sign in to comment.