Skip to content

Commit

Permalink
improved reliability of integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
epinzur committed Jul 10, 2023
1 parent 4e513e0 commit 4acddbe
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 216 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ RUN cargo build --bin sparrow-main
###############################################################################
# Build the wren code

FROM golang:1.19 AS wren-build
FROM golang:1.19.9 AS wren-build

RUN mkdir -p /builds/kaskada/wren
RUN mkdir -p /builds/kaskada/gen/proto/go
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile.go_cross_amd64_to_arm64
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
# same lib versions same cross compilers
FROM ghcr.io/cross-rs/aarch64-unknown-linux-gnu:main

#Install Go 1.19.7
RUN curl -OL https://go.dev/dl/go1.19.7.linux-amd64.tar.gz && \
tar -C /usr/local/ -xzf go1.19.7.linux-amd64.tar.gz
#Install Go 1.19.9
RUN curl -OL https://go.dev/dl/go1.19.9.linux-amd64.tar.gz && \
tar -C /usr/local/ -xzf go1.19.9.linux-amd64.tar.gz

ENV PATH="$PATH:/usr/local/go/bin:/root/go/bin"

Expand Down
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ proto/generate: proto/clean proto/kaskada
@ echo done!

test/int/docker-up:
docker compose -f ./tests/integration/docker-compose.yml up --build --remove-orphans
docker compose -f ./tests/integration/docker-compose.yml up --build --remove-orphans --force-recreate

test/int/docker-up-s3:
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans --force-recreate

test/int/docker-up-s3-only:
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans minio
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans --force-recreate minio

test/int/docker-up-postgres:
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.postgres.yml up --build --remove-orphans
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.postgres.yml up --build --remove-orphans --force-recreate

test/int/docker-up-postgres-s3:
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.postgres.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans
docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.postgres.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans --force-recreate

test/int/run-api-docker:
cd tests/integration/api && ENV=local-docker go run github.com/onsi/ginkgo/v2/ginkgo -vv ./...
Expand Down
6 changes: 5 additions & 1 deletion tests/integration/api/api_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func getPulsarConfig(topicName string) *v1alpha.PulsarConfig {
}

func receivePulsarMessageWithTimeout(pulsarConsumer pulsar.Consumer, ctx context.Context) pulsar.Message {
timeout, timeoutCancel := context.WithTimeout(ctx, 100 * time.Millisecond)
timeout, timeoutCancel := context.WithTimeout(ctx, 250*time.Millisecond)
defer timeoutCancel()
msg, err := pulsarConsumer.Receive(timeout)
if err != nil {
Expand All @@ -235,3 +235,7 @@ func receivePulsarMessageWithTimeout(pulsarConsumer pulsar.Consumer, ctx context
return msg
}
}

func getUniqueName(tableNamePrefix string) string {
return fmt.Sprintf("%s_%d", tableNamePrefix, time.Now().Unix())
}
8 changes: 6 additions & 2 deletions tests/integration/api/graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,10 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity)
g.Expect(dataType).Should(Equal("INT64"))
g.Expect(shape).Should(Equal([]int64{1, 3}))
g.Expect(values).Should(Equal([]int64{20150106, 149, 149}))
}, "30s", "1s").Should(Succeed())

dataType, shape, values, err = redisAIClient.TensorGetValues(key2)
Eventually(func(g Gomega) {
dataType, shape, values, err := redisAIClient.TensorGetValues(key2)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(dataType).Should(Equal("INT64"))
g.Expect(shape).Should(Equal([]int64{1, 3}))
Expand All @@ -235,8 +237,10 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity)
g.Expect(dataType).Should(Equal("INT64"))
g.Expect(shape).Should(Equal([]int64{1, 3}))
g.Expect(values).Should(Equal([]int64{20150109, 149, 100}))
}, "30s", "1s").Should(Succeed())

dataType, shape, values, err = redisAIClient.TensorGetValues(key2)
Eventually(func(g Gomega) {
dataType, shape, values, err := redisAIClient.TensorGetValues(key2)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(dataType).Should(Equal("INT64"))
g.Expect(shape).Should(Equal([]int64{1, 3}))
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/api/mat_pulsar_to_obj_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ var _ = Describe("Materialization from Pulsar to ObjectStore", Ordered, Label("p
g.Expect(results).Should(HaveLen(2)) //header row + 1 data row
g.Expect(results[0]).Should(ContainElements("_time", "_subsort", "_key_hash", "last_id", "last_time", "count"))
g.Expect(results[1]).Should(ContainElements("2023-06-20T23:30:01.000000000", "0", "2122274938272070218", "9", "9", "1687303801000000000", "1"))
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())
})
})

Expand Down Expand Up @@ -207,7 +207,7 @@ var _ = Describe("Materialization from Pulsar to ObjectStore", Ordered, Label("p
g.Expect(results[1]).Should(ContainElements("2023-06-20T23:30:03.000000000", "1", "1575016611515860288", "2", "2", "1687303803000000000", "1"))
g.Expect(results[1]).Should(ContainElements("2023-06-20T23:30:05.000000000", "2", "11820145550582457114", "4", "4", "1687303805000000000", "1"))
}
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())
})
})
})
9 changes: 5 additions & 4 deletions tests/integration/api/mat_pulsar_to_pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type pulsarToPulsarTestSchema struct {
Count int `json:"count"`
}

var _ = Describe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsar"), FlakeAttempts(3), func() {
var _ = Describe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsar"), func() {
var (
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -176,7 +176,7 @@ var _ = Describe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsar
g.Expect(data.Count).Should(Equal(1))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())
})
})

Expand Down Expand Up @@ -204,7 +204,9 @@ var _ = Describe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsar
g.Expect(data.LastTime).Should(Equal(1687303803000000000))
g.Expect(data.Count).Should(Equal(1))
g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "10s", "1s").Should(Succeed())

Eventually(func(g Gomega) {
msg2 := receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg2).ShouldNot(BeNil())

Expand All @@ -214,8 +216,7 @@ var _ = Describe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsar
g.Expect(data2.LastId).Should(Equal(4))
g.Expect(data2.LastTime).Should(Equal(1687303805000000000))
g.Expect(data2.Count).Should(Equal(1))

}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())
})
})
})
26 changes: 13 additions & 13 deletions tests/integration/api/mat_table_to_pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var _ = Describe("Materialization with Pulsar upload", Ordered, Label("pulsar"),
topicName string
materializationClient v1alpha.MaterializationServiceClient
materializationName string
msg pulsar.Message
msg pulsar.Message
)

BeforeAll(func() {
Expand All @@ -56,7 +56,7 @@ var _ = Describe("Materialization with Pulsar upload", Ordered, Label("pulsar"),
Expect(err).ShouldNot(HaveOccurred())

// create a pulsar consumer
topicName = "topic_tableToPulsar"
topicName = "topic_tableToPulsar"
pulsarConsumer, err = pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: uuid.New().String(),
Expand Down Expand Up @@ -90,7 +90,7 @@ var _ = Describe("Materialization with Pulsar upload", Ordered, Label("pulsar"),
conn.Close()
pulsarConsumer.Close()
pulsarClient.Close()

// attempt to delete pulsar topic used in test
cfg := &pulsaradmin.Config{}
cfg.WebServiceURL = "http://localhost:8080"
Expand Down Expand Up @@ -135,7 +135,7 @@ min_amount: table_tableToPulsar.amount | min(),
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -144,13 +144,13 @@ min_amount: table_tableToPulsar.amount | min(),
g.Expect(data.MinAmount).Should(Equal(2))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())

// Verify the second message
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -159,7 +159,7 @@ min_amount: table_tableToPulsar.amount | min(),
g.Expect(data.MinAmount).Should(Equal(3))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())
})
})

Expand All @@ -174,7 +174,7 @@ min_amount: table_tableToPulsar.amount | min(),
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -183,13 +183,13 @@ min_amount: table_tableToPulsar.amount | min(),
g.Expect(data.MinAmount).Should(Equal(2))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())

// Verify the second message
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -198,13 +198,13 @@ min_amount: table_tableToPulsar.amount | min(),
g.Expect(data.MinAmount).Should(Equal(7))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())

// Verify the third message
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
Expand All @@ -213,7 +213,7 @@ min_amount: table_tableToPulsar.amount | min(),
g.Expect(data.MinAmount).Should(Equal(2))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
}, "10s", "1s").Should(Succeed())
})
})
})
7 changes: 5 additions & 2 deletions tests/integration/api/materializations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var _ = Describe("Materializations", Ordered, Label("redis"), Label("redis-ai"),
var cancel context.CancelFunc
var conn *grpc.ClientConn
var tableClient v1alpha.TableServiceClient
var tableName string
var matClient v1alpha.MaterializationServiceClient
var mat1, mat2 *v1alpha.Materialization
var maxAmount, minAmount string
Expand All @@ -35,6 +36,8 @@ var _ = Describe("Materializations", Ordered, Label("redis"), Label("redis-ai"),
tableClient = v1alpha.NewTableServiceClient(conn)
matClient = v1alpha.NewMaterializationServiceClient(conn)

tableName = "purchases_mat_test"

maxAmount = `
{
time: purchases_mat_test.purchase_time,
Expand All @@ -60,7 +63,7 @@ min_amount: purchases_mat_test.amount | min(),

// create a table for the tests and load data into it
table := &v1alpha.Table{
TableName: "purchases_mat_test",
TableName: tableName,
TimeColumnName: "purchase_time",
EntityKeyColumnName: "customer_id",
SubsortColumnName: &wrapperspb.StringValue{
Expand All @@ -76,7 +79,7 @@ min_amount: purchases_mat_test.amount | min(),

AfterAll(func() {
// cleanup items created in the test
_, err := tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: "purchases_mat_test"})
_, err := tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: tableName})
Expect(err).ShouldNot(HaveOccurredGrpc())

cancel()
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/api/queries_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
. "github.com/kaskada-ai/kaskada/tests/integration/shared/matchers"
)

var _ = Describe("Queries V1", Ordered, func() {
var _ = Describe("Queries V1", Ordered, Label("pulsar"), func() {
var (
ctx context.Context
cancel context.CancelFunc
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/api/query_v1_csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ = Describe("Query V1 gRPC with csv", Ordered, func() {

BeforeAll(func() {
//get connection to wren
ctx, cancel, conn = grpcConfig.GetContextCancelConnection(10)
ctx, cancel, conn = grpcConfig.GetContextCancelConnection(20)
ctx = metadata.AppendToOutgoingContext(ctx, "client-id", *integrationClientID)

// get a grpc client for the table & compute services
Expand Down
Loading

0 comments on commit 4acddbe

Please sign in to comment.