diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 168f2efd..909be9a4 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -52,4 +52,4 @@ jobs: - name: latency server server unit tests run: cd cmd/latencyserver && go test -race - name: operator unit tests - run: IMAGE_NAME_SAMPLE=thundernetes-netcore IMAGE_NAME_INIT_CONTAINER=thundernetes-initcontainer TAG=$(git rev-list HEAD --max-count=1 --abbrev-commit) make -C pkg/operator test \ No newline at end of file + run: make -C pkg/operator test \ No newline at end of file diff --git a/pkg/operator/controllers/controller_utils.go b/pkg/operator/controllers/controller_utils.go index 0827ff0b..7159ac2d 100644 --- a/pkg/operator/controllers/controller_utils.go +++ b/pkg/operator/controllers/controller_utils.go @@ -123,21 +123,24 @@ func NewGameServerForGameServerBuild(gsb *mpsv1alpha1.GameServerBuild, portRegis }, // we don't create any status since we have the .Status subresource enabled } + // get host ports + // we assume that each portToExpose exists only once in the GameServer spec + hostPorts, err := portRegistry.GetNewPorts(len(gsb.Spec.PortsToExpose)) + j := 0 + if err != nil { + return nil, err + } // assigning host ports for all the containers in the Template.Spec for i := 0; i < len(gs.Spec.Template.Spec.Containers); i++ { container := gs.Spec.Template.Spec.Containers[i] for i := 0; i < len(container.Ports); i++ { if sliceContainsPortToExpose(gsb.Spec.PortsToExpose, container.Ports[i].ContainerPort) { - port, err := portRegistry.GetNewPort() - if err != nil { - return nil, err - } - container.Ports[i].HostPort = port - + container.Ports[i].HostPort = hostPorts[j] // if the user has specified that they want to use the host's network, we override the container port if gs.Spec.Template.Spec.HostNetwork { - container.Ports[i].ContainerPort = port + container.Ports[i].ContainerPort = hostPorts[j] } + j++ // increase the hostPort index so on the next iteration (if any) we will use the next hostPort } } } diff --git a/pkg/operator/controllers/gameserverbuild_controller_test.go b/pkg/operator/controllers/gameserverbuild_controller_test.go index d5a19cf5..8b14764e 100644 --- a/pkg/operator/controllers/gameserverbuild_controller_test.go +++ b/pkg/operator/controllers/gameserverbuild_controller_test.go @@ -2,18 +2,12 @@ package controllers import ( "context" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1" ) -const ( - assertPollingInterval = 20 * time.Millisecond - assertTimeout = 2 * time.Second -) - var _ = Describe("GameServerBuild controller tests", func() { Context("testing creating a gameserverbuild creates game servers", func() { ctx := context.Background() diff --git a/pkg/operator/controllers/port_registry.go b/pkg/operator/controllers/port_registry.go index aee1c2e6..92e83f71 100644 --- a/pkg/operator/controllers/port_registry.go +++ b/pkg/operator/controllers/port_registry.go @@ -23,6 +23,7 @@ type PortRegistry struct { HostPortsPerNode map[int32]int // a slice for the entire port range. increases by 1 for each registered port Min int32 // Minimum Port Max int32 // Maximum Port + FreePortsCount int // the number of free ports lockMutex sync.Mutex // lock for the map useSpecificNodePoolForGameServers bool // if true, we only take into account Nodes that have the Label "mps.playfab.com/gameservernode"=true nextPortNumber int32 // the next port to be assigned @@ -48,10 +49,11 @@ func NewPortRegistry(client client.Client, gameServers *mpsv1alpha1.GameServerLi client: client, Min: min, Max: max, + NodeCount: nodeCount, + FreePortsCount: nodeCount * int(max-min+1), // +1 since the [min,max] ports set is inclusive of both edges lockMutex: sync.Mutex{}, useSpecificNodePoolForGameServers: useSpecificNodePool, nextPortNumber: min, - NodeCount: nodeCount, logger: log.Log.WithName("portregistry"), } @@ -136,6 +138,7 @@ func (pr *PortRegistry) onNodeAdded() { defer pr.lockMutex.Unlock() pr.lockMutex.Lock() pr.NodeCount++ + pr.FreePortsCount += int(pr.Max - pr.Min + 1) } // onNodeRemoved is called when a Node is removed from the cluster @@ -145,29 +148,44 @@ func (pr *PortRegistry) onNodeRemoved() { defer pr.lockMutex.Unlock() pr.lockMutex.Lock() pr.NodeCount-- + pr.FreePortsCount -= int(pr.Max - pr.Min + 1) } -// GetNewPort returns and registers a new port for the designated game server -// One may wonder what happens if two GameServer Pods get assigned the same HostPort -// The answer is that we will not have a collision, since Kubernetes is pretty smart and will place the Pod on a different Node, to prevent it -func (pr *PortRegistry) GetNewPort() (int32, error) { +// GetNewPorts returns and registers a slice of ports with "count" length that will be used by a GameServer +// It returns an error if there are no available ports +// You may wonder what happens if two GameServer Pods get assigned the same HostPort +// We will not have a collision, since Kubernetes is pretty smart and will place the Pod on a different Node, to prevent it +func (pr *PortRegistry) GetNewPorts(count int) ([]int32, error) { defer pr.lockMutex.Unlock() pr.lockMutex.Lock() - - for port := pr.nextPortNumber; port <= pr.Max; port++ { - // this port is used less than maximum times (where maximum is the number of nodes) - if pr.HostPortsPerNode[port] < pr.NodeCount { - pr.HostPortsPerNode[port]++ - pr.nextPortNumber = port + 1 - // we did a full cycle on the map - if pr.nextPortNumber > pr.Max { - pr.nextPortNumber = pr.Min + if count > pr.FreePortsCount { + return nil, errors.New("not enough free ports") + } + portsToReturn := make([]int32, count) + for i := 0; i < count; i++ { + portFound := false + // get the next port + for port := pr.nextPortNumber; port <= pr.Max; port++ { + // this port is used less times than the total number of Nodes + if pr.HostPortsPerNode[port] < pr.NodeCount { + pr.HostPortsPerNode[port]++ // increase the times (Nodes) this port is used + pr.nextPortNumber = port + 1 // move the next port to the next one + // we did a full circle on the map + if pr.nextPortNumber > pr.Max { + pr.nextPortNumber = pr.Min + } + pr.FreePortsCount-- // decrease the number of used ports + portsToReturn[i] = port // add the port to the slice to be returned + portFound = true + break // exit the loop } - return port, nil + } + if !portFound { + // we made a full circle, no available ports + return nil, errors.New("cannot register a new port. No available ports") } } - - return -1, errors.New("cannot register a new port. No available ports") + return portsToReturn, nil } // DeregisterServerPorts deregisters all host ports so they can be re-used by additional game servers @@ -179,6 +197,7 @@ func (pr *PortRegistry) DeregisterServerPorts(ports []int32, gsName string) erro // following log should NOT be changed since an e2e test depends on it pr.logger.V(1).Info("Deregistering port", "port", ports[i], "GameServer", gsName) pr.HostPortsPerNode[ports[i]]-- + pr.FreePortsCount++ } else { return fmt.Errorf("cannot deregister port %d, it is not registered or has already been deleted", ports[i]) } @@ -194,6 +213,7 @@ func (pr *PortRegistry) assignRegisteredPorts(ports []int32) { for i := 0; i < len(ports); i++ { pr.logger.V(1).Info("Registering port", "port", ports[i]) pr.HostPortsPerNode[ports[i]]++ + pr.FreePortsCount-- } } diff --git a/pkg/operator/controllers/port_registry_test.go b/pkg/operator/controllers/port_registry_test.go index 8fd8b82b..434d857e 100644 --- a/pkg/operator/controllers/port_registry_test.go +++ b/pkg/operator/controllers/port_registry_test.go @@ -37,11 +37,13 @@ var _ = Describe("Port registry tests", func() { portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) Expect(portRegistry.Min).To(Equal(int32(testMinPort))) Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) + Expect(portRegistry.FreePortsCount).To(Equal(testMaxPort - testMinPort + 1)) assignedPorts := make(map[int32]int) // get 4 ports + ports, err := portRegistry.GetNewPorts(4) + Expect(err).ToNot(HaveOccurred()) for i := 0; i < 4; i++ { - port, err := portRegistry.GetNewPort() - Expect(err).ToNot(HaveOccurred()) + port := ports[i] validatePort(port, testMinPort, testMaxPort) Expect(port).To(Equal(int32(testMinPort + i))) if _, ok := assignedPorts[port]; ok { @@ -55,9 +57,10 @@ var _ = Describe("Port registry tests", func() { It("should fail to allocate more ports than the maximum", func() { portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) assignedPorts := make(map[int32]int) - for i := testMinPort; i <= testMaxPort; i++ { - port, err := portRegistry.GetNewPort() - Expect(err).ToNot(HaveOccurred()) + ports, err := portRegistry.GetNewPorts(testMaxPort - testMinPort + 1) + Expect(err).To(Not(HaveOccurred())) + for i := 0; i < testMaxPort-testMinPort+1; i++ { + port := ports[i] validatePort(port, testMinPort, testMaxPort) if _, ok := assignedPorts[port]; ok { Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) @@ -67,7 +70,7 @@ var _ = Describe("Port registry tests", func() { verifyExpectedHostPorts(portRegistry, assignedPorts, 10) // this one should fail - _, err := portRegistry.GetNewPort() + _, err = portRegistry.GetNewPorts(1) Expect(err).To(HaveOccurred()) }) @@ -79,14 +82,14 @@ var _ = Describe("Port registry tests", func() { // do a manual reconcile since we haven't added the controller to the manager portRegistry.Reconcile(context.Background(), reconcile.Request{}) Eventually(func() error { - return verifyHostPortsPerNode(portRegistry, 2) + return verifyHostPortsPerNode(portRegistry, 2, 2*(testMaxPort-testMinPort+1)) }).Should(Succeed()) err = kubeClient.Delete(context.Background(), node) Expect(err).ToNot(HaveOccurred()) portRegistry.Reconcile(context.Background(), reconcile.Request{}) Eventually(func() error { - return verifyHostPortsPerNode(portRegistry, 1) + return verifyHostPortsPerNode(portRegistry, 1, 1*(testMaxPort-testMinPort+1)) }).Should(Succeed()) }) @@ -98,14 +101,15 @@ var _ = Describe("Port registry tests", func() { // do a manual reconcile since we haven't added the controller to the manager portRegistry.Reconcile(context.Background(), reconcile.Request{}) Eventually(func() error { - return verifyHostPortsPerNode(portRegistry, 2) + return verifyHostPortsPerNode(portRegistry, 2, 2*(testMaxPort-testMinPort+1)) }).Should(Succeed()) assignedPorts := make(map[int32]int) // get 15 ports + ports, err := portRegistry.GetNewPorts(15) + Expect(err).ToNot(HaveOccurred()) for i := 0; i < 15; i++ { - port, err := portRegistry.GetNewPort() - Expect(err).ToNot(HaveOccurred()) + port := ports[i] validatePort(port, testMinPort, testMaxPort) assignedPorts[port] = assignedPorts[port] + 1 } @@ -118,9 +122,11 @@ var _ = Describe("Port registry tests", func() { Expect(portRegistry.Min).To(Equal(int32(testMinPort))) Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) assignedPorts := make(map[int32]int) - // get 10 ports + // allocate 10 ports + ports, err := portRegistry.GetNewPorts(10) + Expect(err).To(Not(HaveOccurred())) for i := 0; i < 10; i++ { - port, err := portRegistry.GetNewPort() + port := ports[i] Expect(err).ToNot(HaveOccurred()) validatePort(port, testMinPort, testMaxPort) if _, ok := assignedPorts[port]; ok { @@ -131,11 +137,11 @@ var _ = Describe("Port registry tests", func() { verifyExpectedHostPorts(portRegistry, assignedPorts, 10) // deallocate two ports - err := portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}, testGsName) + err = portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}, testGsName) Expect(err).ToNot(HaveOccurred()) delete(assignedPorts, testMinPort+1) delete(assignedPorts, testMinPort+3) - verifyExpectedHostPorts(portRegistry, assignedPorts, 8) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) // 10 minus two }) It("should successfully deallocate ports - two Nodes to one scenario", func() { @@ -144,8 +150,10 @@ var _ = Describe("Port registry tests", func() { Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) assignedPorts := make(map[int32]int) // get 10 ports + ports, err := portRegistry.GetNewPorts(10) + Expect(err).To(Not(HaveOccurred())) for i := 0; i < 10; i++ { - port, err := portRegistry.GetNewPort() + port := ports[i] Expect(err).ToNot(HaveOccurred()) validatePort(port, testMinPort, testMaxPort) if _, ok := assignedPorts[port]; ok { @@ -155,11 +163,11 @@ var _ = Describe("Port registry tests", func() { } verifyExpectedHostPorts(portRegistry, assignedPorts, 10) // deallocate two ports - err := portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}, testGsName) + err = portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}, testGsName) Expect(err).ToNot(HaveOccurred()) delete(assignedPorts, testMinPort+1) delete(assignedPorts, testMinPort+3) - verifyExpectedHostPorts(portRegistry, assignedPorts, 8) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) // ten minus two // add a second Node node := getNewNodeForTest("node2") @@ -168,13 +176,14 @@ var _ = Describe("Port registry tests", func() { // do a manual reconcile since we haven't added the controller to the manager portRegistry.Reconcile(context.Background(), reconcile.Request{}) Eventually(func() error { - return verifyHostPortsPerNode(portRegistry, 2) + return verifyHostPortsPerNode(portRegistry, 2, 2*(testMaxPort-testMinPort+1)-8) // ten minus two }).Should(Succeed()) - // get 8 ports, we have 16 in total + // get 8 more ports, we have 16 in total + ports, err = portRegistry.GetNewPorts(8) + Expect(err).To(Not(HaveOccurred())) for i := 0; i < 8; i++ { - port, err := portRegistry.GetNewPort() - Expect(err).ToNot(HaveOccurred()) + port := ports[i] validatePort(port, testMinPort, testMaxPort) assignedPorts[port] = assignedPorts[port] + 1 } @@ -191,14 +200,14 @@ var _ = Describe("Port registry tests", func() { } } Expect(deletedPortsCount).To(Equal(6)) - verifyExpectedHostPorts(portRegistry, assignedPorts, 10) + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) // 16 minus 6 // now delete the second node err = kubeClient.Delete(context.Background(), node) Expect(err).ToNot(HaveOccurred()) portRegistry.Reconcile(context.Background(), reconcile.Request{}) Eventually(func() error { - return verifyHostPortsPerNode(portRegistry, 1) + return verifyHostPortsPerNode(portRegistry, 1, 1*(testMaxPort-testMinPort+1)-10) // 16 minus 6 }).Should(Succeed()) verifyExpectedHostPorts(portRegistry, assignedPorts, 10) @@ -208,7 +217,7 @@ var _ = Describe("Port registry tests", func() { delete(assignedPorts, testMinPort+1) delete(assignedPorts, testMinPort+2) delete(assignedPorts, testMinPort+3) - verifyExpectedHostPorts(portRegistry, assignedPorts, 7) + verifyExpectedHostPorts(portRegistry, assignedPorts, 7) // 10 minus three }) }) @@ -232,11 +241,11 @@ var _ = Describe("Port registry with two thousand ports, five hundred on four no } Eventually(func() error { - return verifyHostPortsPerNode(portRegistry, 4) + return verifyHostPortsPerNode(portRegistry, 4, 4*int(max-min+1)) }).Should(Succeed()) It("should work with allocating and deallocating ports", func() { - // allocate all 2000 ports + // allocate all 2000 ports, one at a time var wg sync.WaitGroup for i := 0; i < int(max-min+1)*4; i++ { wg.Add(1) @@ -244,13 +253,13 @@ var _ = Describe("Port registry with two thousand ports, five hundred on four no defer wg.Done() n := rand.Intn(200) + 50 // n will be between 50 and 250 time.Sleep(time.Duration(n) * time.Millisecond) - port, err := portRegistry.GetNewPort() + ports, err := portRegistry.GetNewPorts(1) Expect(err).ToNot(HaveOccurred()) - val, ok := assignedPorts.Load(port) + val, ok := assignedPorts.Load(ports[0]) if !ok { - assignedPorts.Store(port, 1) + assignedPorts.Store(ports[0], 1) } else { - assignedPorts.Store(port, val.(int)+1) + assignedPorts.Store(ports[0], val.(int)+1) } }() } @@ -259,7 +268,7 @@ var _ = Describe("Port registry with two thousand ports, five hundred on four no verifyExpectedHostPorts(portRegistry, m, 2000) // trying to get another port should fail, since we've allocated every available port - _, err := portRegistry.GetNewPort() + _, err := portRegistry.GetNewPorts(1) Expect(err).To(HaveOccurred()) //deallocate 1000 ports @@ -290,8 +299,9 @@ var _ = Describe("Port registry with two thousand ports, five hundred on four no defer wg.Done() n := rand.Intn(200) + 50 // n will be between 50 and 250 time.Sleep(time.Duration(n) * time.Millisecond) - port, err := portRegistry.GetNewPort() + ports, err := portRegistry.GetNewPorts(1) Expect(err).ToNot(HaveOccurred()) + port := ports[0] val, ok := assignedPorts.Load(port) if !ok { assignedPorts.Store(port, 1) @@ -312,8 +322,9 @@ var _ = Describe("Port registry with two thousand ports, five hundred on four no defer wg.Done() n := rand.Intn(200) + 50 // n will be between 50 and 250 time.Sleep(time.Duration(n) * time.Millisecond) - port, err := portRegistry.GetNewPort() + ports, err := portRegistry.GetNewPorts(1) Expect(err).ToNot(HaveOccurred()) + port := ports[0] val, ok := assignedPorts.Load(port) if !ok { assignedPorts.Store(port, 1) @@ -328,7 +339,7 @@ var _ = Describe("Port registry with two thousand ports, five hundred on four no verifyExpectedHostPorts(portRegistry, m, 2000) // trying to get another port should fail, since we've allocated every available port - _, err = portRegistry.GetNewPort() + _, err = portRegistry.GetNewPorts(1) Expect(err).To(HaveOccurred()) }) }) @@ -355,10 +366,13 @@ func verifyExpectedHostPorts(portRegistry *PortRegistry, expectedHostPorts map[i // verifyHostPortsPerNode verifies that the hostPortsPerNode map on the PortRegistry has the proper length // and its item has the correct length as well -func verifyHostPortsPerNode(portRegistry *PortRegistry, expectedNodeCount int) error { +func verifyHostPortsPerNode(portRegistry *PortRegistry, expectedNodeCount, expectedFreePortsCount int) error { if portRegistry.NodeCount != expectedNodeCount { return fmt.Errorf("NodeCount is not %d, it is %d", expectedNodeCount, portRegistry.NodeCount) } + if portRegistry.FreePortsCount != expectedFreePortsCount { + return fmt.Errorf("FreePortsCount is not %d, it is %d", expectedFreePortsCount, portRegistry.FreePortsCount) + } return nil } diff --git a/pkg/operator/controllers/suite_test.go b/pkg/operator/controllers/suite_test.go index 7bcb8389..a5a31350 100644 --- a/pkg/operator/controllers/suite_test.go +++ b/pkg/operator/controllers/suite_test.go @@ -21,6 +21,7 @@ import ( "path/filepath" "strings" "testing" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -35,6 +36,11 @@ import ( //+kubebuilder:scaffold:imports ) +const ( + assertPollingInterval = 20 * time.Millisecond + assertTimeout = 2 * time.Second +) + // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo.