Skip to content

Commit

Permalink
fix ServiceSpec race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
hagay3 committed May 28, 2022
1 parent 3f414d4 commit d106021
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 35 deletions.
77 changes: 50 additions & 27 deletions client/src/it/scala/skuber/ServiceSpec.scala
Original file line number Diff line number Diff line change
@@ -1,50 +1,73 @@
package skuber

import skuber.json.format.serviceFmt
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually

import scala.concurrent.Await
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.{BeforeAndAfterAll, Matchers}
import skuber.json.format.{serviceFmt, serviceListFmt}
import scala.concurrent.duration._
import scala.util.{Failure, Random, Success}
import scala.util.Random

class ServiceSpec extends K8SFixture with Eventually with BeforeAndAfterAll with ScalaFutures with Matchers {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.second)

val defaultLabels = Map("app" -> this.suiteName)

override def afterAll(): Unit = {
val k8s = k8sInit
val requirements = defaultLabels.toSeq.map { case (k, v) => LabelSelector.IsEqualRequirement(k, v) }
val labelSelector = LabelSelector(requirements: _*)
val results = k8s.deleteAllSelected[ServiceList](labelSelector).recover{ case _ => () }
results.futureValue

results.onComplete { _ =>
k8s.close
}
}

def nginxServiceName: String = Random.alphanumeric.filter(_.isLetter).take(20).mkString.toLowerCase

class ServiceSpec extends K8SFixture with Eventually with Matchers {
val nginxServiceName: String = Random.alphanumeric.filter(_.isLetter).take(20).mkString.toLowerCase

behavior of "Service"

it should "create a service" in { k8s =>
k8s.create(getService(nginxServiceName)) map { p =>
assert(p.name == nginxServiceName)
}
val serviceName1: String = nginxServiceName
val p = k8s.create(getService(serviceName1)).futureValue
assert(p.name == serviceName1)

}

it should "get the newly created service" in { k8s =>
k8s.get[Service](nginxServiceName) map { d =>
assert(d.name == nginxServiceName)
// Default ServiceType is ClusterIP
assert(d.spec.map(_._type) == Option(Service.Type.ClusterIP))
}
val serviceName2: String = nginxServiceName
k8s.create(getService(serviceName2)).futureValue
val d = k8s.get[Service](serviceName2).futureValue
assert(d.name == serviceName2)
// Default ServiceType is ClusterIP
assert(d.spec.map(_._type) == Option(Service.Type.ClusterIP))

}

it should "delete a service" in { k8s =>
k8s.delete[Service](nginxServiceName).map { _ =>
eventually(timeout(100.seconds), interval(3.seconds)) {
val retrieveService = k8s.get[Service](nginxServiceName)
val serviceRetrieved = Await.ready(retrieveService, 2.seconds).value.get
serviceRetrieved match {
case s: Success[_] => assert(false)
case Failure(ex) => ex match {
case ex: K8SException if ex.status.code.contains(404) => assert(true)
case _ => assert(false)
}
val serviceName3: String = nginxServiceName
k8s.create(getService(serviceName3)).futureValue
k8s.delete[Service](serviceName3).futureValue
eventually(timeout(20.seconds), interval(3.seconds)) {

whenReady(
k8s.get[Service](serviceName3).failed
) { result =>
result shouldBe a[K8SException]
result match {
case ex: K8SException => ex.status.code shouldBe Some(404)
case _ => assert(false)
}
}
}

}

def getService(name: String): Service = {
val spec: Service.Spec = Service.Spec(ports = List(Service.Port(port = 80)), selector = Map("app" -> "nginx"))
Service(name, spec)
val serviceMeta = ObjectMeta(name = name, labels = defaultLabels)
Service(name, spec).copy(metadata = serviceMeta)
}
}
13 changes: 5 additions & 8 deletions client/src/it/scala/skuber/WatchContinuouslySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import org.scalatest.Matchers
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.time.{Seconds, Span}
import skuber.apps.v1.{Deployment, DeploymentList}

import scala.concurrent.duration._
import scala.concurrent.Await

import scala.language.postfixOps

class WatchContinuouslySpec extends K8SFixture with Eventually with Matchers with ScalaFutures {
Expand All @@ -35,7 +32,7 @@ class WatchContinuouslySpec extends K8SFixture with Eventually with Matchers wit
}

// Wait for watch to be confirmed before performing the actions that create new events to be watched
Await.result(stream, 5.seconds)
stream.futureValue

//Create first deployment and delete it.
k8s.create(deploymentOne).futureValue.name shouldBe deploymentOneName
Expand All @@ -50,7 +47,7 @@ class WatchContinuouslySpec extends K8SFixture with Eventually with Matchers wit
* This will ensure multiple requests are performed by
* the source including empty responses
*/
pause(62.seconds)
pause(10.seconds)

//Create second deployment and delete it.
k8s.create(deploymentTwo).futureValue.name shouldBe deploymentTwoName
Expand Down Expand Up @@ -100,7 +97,7 @@ class WatchContinuouslySpec extends K8SFixture with Eventually with Matchers wit
* This will ensure multiple requests are performed by
* the source including empty responses
*/
pause(62.seconds)
pause(20.seconds)

k8s.delete[Deployment](deploymentName).futureValue

Expand Down Expand Up @@ -146,7 +143,7 @@ class WatchContinuouslySpec extends K8SFixture with Eventually with Matchers wit
* This will ensure multiple requests are performed by
* the source including empty responses
*/
pause(62.seconds)
pause(20.seconds)

k8s.delete[Deployment](deploymentName).futureValue

Expand Down Expand Up @@ -189,7 +186,7 @@ class WatchContinuouslySpec extends K8SFixture with Eventually with Matchers wit
* This will ensure multiple requests are performed by
* the source including empty responses
*/
pause(62.seconds)
pause(20.seconds)

k8s.delete[Deployment](deploymentName).futureValue

Expand Down

0 comments on commit d106021

Please sign in to comment.