Skip to content

Commit

Permalink
table to pulsar test now passing
Browse files Browse the repository at this point in the history
  • Loading branch information
epinzur committed Jun 22, 2023
1 parent 15026d0 commit 891d2fc
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 118 deletions.
14 changes: 14 additions & 0 deletions tests/integration/api/api_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/RedisAI/redisai-go/redisai"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/gomodule/redigo/redis"
_ "github.com/lib/pq"

Expand Down Expand Up @@ -221,3 +222,16 @@ func getPulsarConfig(topicName string) *v1alpha.PulsarConfig {
TopicName: topicName,
}
}

func receivePulsarMessageWithTimeout(pulsarConsumer pulsar.Consumer, ctx context.Context) pulsar.Message {
timeout, timeoutCancel := context.WithTimeout(ctx, 100 * time.Millisecond)
defer timeoutCancel()
msg, err := pulsarConsumer.Receive(timeout)
if err != nil {
helpers.LogLn("timed out waiting for puslar message")
return nil
} else {
helpers.LogLn("recieved pulsar response: %v", msg)
return msg
}
}
2 changes: 1 addition & 1 deletion tests/integration/api/mat_pulsar_to_obj_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type pulsarTestSchema struct {
MinAmount int `json:"min_amount"`
}

var _ = FDescribe("Materialization from Pulsar to ObjectStore", Ordered, Label("pulsar"), func() {
var _ = Describe("Materialization from Pulsar to ObjectStore", Ordered, Label("pulsar"), func() {
var (
ctx context.Context
cancel context.CancelFunc
Expand Down
32 changes: 9 additions & 23 deletions tests/integration/api/mat_pulsar_to_pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ var _ = PDescribe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsa
_, err = pulsarProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: helpers.ReadFile("avro/msg_1.avro"),
})
_, err = pulsarProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: helpers.ReadFile("avro/msg_2.avro"),
})
Expect(err).ShouldNot(HaveOccurred(), "failed to publish message")
_, err = pulsarProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: helpers.ReadFile("avro/msg_3.avro"),
})
Expect(err).ShouldNot(HaveOccurred(), "failed to publish message")

// create a pulsar consumer
pulsarConsumer, err = pulsarClient.Subscribe(pulsar.ConsumerOptions{
Expand Down Expand Up @@ -173,12 +165,8 @@ var _ = PDescribe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsa

It("Should output initial results to pulsar", func() {
Eventually(func(g Gomega) {
timeout, timeoutCancel := context.WithTimeout(ctx, 100 * time.Millisecond)
defer timeoutCancel()
msg, err := pulsarConsumer.Receive(timeout)

helpers.LogLn("recieved: %v, err: %v", msg, err)
g.Expect(err).ShouldNot(HaveOccurred())
msg := receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarToPulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
Expand All @@ -187,29 +175,27 @@ var _ = PDescribe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsa
g.Expect(data.LastTime).Should(Equal(10))
g.Expect(data.Count).Should(Equal(1))

pulsarConsumer.Ack(msg)
g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
})
})

Describe("Load the more data into the table", func() {
Describe("Load more data into the table", func() {
It("Should work without error", func() {
_, err = pulsarProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: helpers.ReadFile("avro/msg_4.avro"),
Payload: helpers.ReadFile("avro/msg_2.avro"),
})
Expect(err).ShouldNot(HaveOccurred(), "failed to publish message")
_, err = pulsarProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: helpers.ReadFile("avro/msg_5.avro"),
Payload: helpers.ReadFile("avro/msg_3.avro"),
})
Expect(err).ShouldNot(HaveOccurred(), "failed to publish message")
})

It("Should output additional results to pulsar", func() {
Eventually(func(g Gomega) {
timeout, timeoutCancel := context.WithTimeout(ctx, 100 * time.Millisecond)
defer timeoutCancel()
msg, err := pulsarConsumer.Receive(timeout)
g.Expect(err).ShouldNot(HaveOccurred())
msg := receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarToPulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
Expand All @@ -218,7 +204,7 @@ var _ = PDescribe("Materialization from Pulsar to Pulsar", Ordered, Label("pulsa
g.Expect(data.LastTime).Should(Equal(20))
g.Expect(data.Count).Should(Equal(2))

pulsarConsumer.Ack(msg)
g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
})
})
Expand Down
196 changes: 102 additions & 94 deletions tests/integration/api/mat_table_to_pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
pulsaradmin "github.com/streamnative/pulsar-admin-go"
"github.com/streamnative/pulsar-admin-go/pkg/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -19,18 +21,21 @@ import (
. "github.com/kaskada-ai/kaskada/tests/integration/shared/matchers"
)

var _ = PDescribe("Materialization with Pulsar upload", Ordered, Label("pulsar"), func() {
var _ = Describe("Materialization with Pulsar upload", Ordered, Label("pulsar"), func() {
var (
ctx context.Context
cancel context.CancelFunc
conn *grpc.ClientConn
err error
pulsarClient pulsar.Client
pulsarConsumer pulsar.Consumer
table *v1alpha.Table
tableClient v1alpha.TableServiceClient
tableName string
topicUrl string
topicName string
materializationClient v1alpha.MaterializationServiceClient
materializationName string
msg pulsar.Message
)

BeforeAll(func() {
Expand All @@ -41,6 +46,7 @@ var _ = PDescribe("Materialization with Pulsar upload", Ordered, Label("pulsar")
// get a grpc client for the table & materialization services
tableClient = v1alpha.NewTableServiceClient(conn)
materializationClient = v1alpha.NewMaterializationServiceClient(conn)
materializationName = "mat_tableToPulsar"

// create a pulsar client
pulsarClient, err = pulsar.NewClient(pulsar.ClientOptions{
Expand All @@ -49,8 +55,17 @@ var _ = PDescribe("Materialization with Pulsar upload", Ordered, Label("pulsar")
})
Expect(err).ShouldNot(HaveOccurred())

// create a pulsar consumer
topicName = "topic_tableToPulsar"
pulsarConsumer, err = pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: uuid.New().String(),
Type: pulsar.Shared,
})
Expect(err).ShouldNot(HaveOccurred(), "issue creating pulsar consumer")

// create a table
tableName = "PulsarTable"
tableName = "table_tableToPulsar"
table = &v1alpha.Table{
TableName: tableName,
TimeColumnName: "purchase_time",
Expand All @@ -68,83 +83,83 @@ var _ = PDescribe("Materialization with Pulsar upload", Ordered, Label("pulsar")

AfterAll(func() {
// clean up items from the test
materializationClient.DeleteMaterialization(ctx, &v1alpha.DeleteMaterializationRequest{MaterializationName: "purchase_min_and_max"})
materializationClient.DeleteMaterialization(ctx, &v1alpha.DeleteMaterializationRequest{MaterializationName: materializationName})
tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: tableName})

cancel()
conn.Close()
pulsarConsumer.Close()
pulsarClient.Close()

// attempt to delete pulsar topic used in test
cfg := &pulsaradmin.Config{}
cfg.WebServiceURL = "http://localhost:8080"
admin, err := pulsaradmin.NewClient(cfg)
Expect(err).ShouldNot(HaveOccurred(), "issue getting puslar admin client")
Expect(err).ShouldNot(HaveOccurred())
topic, _ := utils.GetTopicName(fmt.Sprintf("public/default/%s", topicName))
err = admin.Topics().Delete(*topic, true, true)
Expect(err).ShouldNot(HaveOccurred(), "issue deleting pulsar in topic")
})

Describe("Create a materialization", func() {
It("Should work without error", func() {
topicName := uuid.New().String()
createRequest := &v1alpha.CreateMaterializationRequest{
Materialization: &v1alpha.Materialization{
MaterializationName: "purchase_min_and_max",
MaterializationName: materializationName,
Expression: `
{
key: PulsarTable.customer_id,
max_amount: PulsarTable.amount | max(),
min_amount: PulsarTable.amount | min(),
key: table_tableToPulsar.customer_id,
max_amount: table_tableToPulsar.amount | max(),
min_amount: table_tableToPulsar.amount | min(),
}`,
Destination: &v1alpha.Destination{
Destination: &v1alpha.Destination_Pulsar{
Pulsar: &v1alpha.PulsarDestination{
Config: &v1alpha.PulsarConfig{
BrokerServiceUrl: "pulsar://pulsar:6650",
Tenant: "public",
Namespace: "default",
TopicName: topicName,
},
Config: getPulsarConfig(topicName),
},
},
},
},
}
topicUrl = "persistent://public/default/" + topicName

res, err := materializationClient.CreateMaterialization(ctx, createRequest)
Expect(err).ShouldNot(HaveOccurredGrpc())
Expect(res).ShouldNot(BeNil())
VerifyRequestDetails(res.RequestDetails)
})

It("Should upload results to pulsar", func() {
consumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: topicUrl,
SubscriptionName: uuid.New().String(),
Type: pulsar.Shared,
})
Expect(err).ShouldNot(HaveOccurred())
defer consumer.Close()

// Verify the first message
var data pulsarTestSchema
msg, err := consumer.Receive(context.Background())
Expect(err).ShouldNot(HaveOccurred())

err = json.Unmarshal(msg.Payload(), &data)
Expect(err).ShouldNot(HaveOccurred())
Expect(data.Key).Should(Equal("karen"))
Expect(data.MaxAmount).Should(Equal(9))
Expect(data.MinAmount).Should(Equal(2))

consumer.Ack(msg)
time.Sleep(1 * time.Second) // add a delay for testing purposes
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(data.Key).Should(Equal("karen"))
g.Expect(data.MaxAmount).Should(Equal(9))
g.Expect(data.MinAmount).Should(Equal(2))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())

// Verify the second message
msg, err = consumer.Receive(context.Background())
Expect(err).Should(BeNil())

err = json.Unmarshal(msg.Payload(), &data)
Expect(err).Should(BeNil())

// Verify the message fields
Expect(data.Key).Should(Equal("patrick"))
Expect(data.MaxAmount).Should(Equal(5000))
Expect(data.MinAmount).Should(Equal(3))
consumer.Ack(msg)

consumer.Close()
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())
var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(data.Key).Should(Equal("patrick"))
g.Expect(data.MaxAmount).Should(Equal(5000))
g.Expect(data.MinAmount).Should(Equal(3))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
})
})

Expand All @@ -154,58 +169,51 @@ min_amount: PulsarTable.amount | min(),
})

It("Should upload results to pulsar", func() {
consumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: topicUrl,
SubscriptionName: uuid.New().String(),
Type: pulsar.Shared,
})
Expect(err).Should(BeNil())
defer consumer.Close()

// Verify the first message
var data pulsarTestSchema
msg, err := consumer.Receive(context.Background())
Expect(err).Should(BeNil())
fmt.Printf("\nMessage: %s\n", msg.Payload())

err = json.Unmarshal(msg.Payload(), &data)
Expect(err).Should(BeNil())
Expect(data.Key).Should(Equal("karen"))
Expect(data.MaxAmount).Should(Equal(9))
Expect(data.MinAmount).Should(Equal(2))
consumer.Ack(msg)
time.Sleep(1 * time.Second) // add a delay for testing purposes
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(data.Key).Should(Equal("karen"))
g.Expect(data.MaxAmount).Should(Equal(9))
g.Expect(data.MinAmount).Should(Equal(2))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())

// Verify the second message
msg, err = consumer.Receive(context.Background())
Expect(err).Should(BeNil())
fmt.Printf("\nMessage: %s\n", msg.Payload())

err = json.Unmarshal(msg.Payload(), &data)
Expect(err).Should(BeNil())

// Verify the message fields
Expect(data.Key).Should(Equal("spongebob"))
Expect(data.MaxAmount).Should(Equal(34))
Expect(data.MinAmount).Should(Equal(7))
consumer.Ack(msg)
time.Sleep(1 * time.Second) // add a delay for testing purposes
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())
var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(data.Key).Should(Equal("spongebob"))
g.Expect(data.MaxAmount).Should(Equal(34))
g.Expect(data.MinAmount).Should(Equal(7))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())

// Verify the third message
msg, err = consumer.Receive(context.Background())
Expect(err).Should(BeNil())
fmt.Printf("\nMessage: %s\n", msg.Payload())

err = json.Unmarshal(msg.Payload(), &data)
Expect(err).Should(BeNil())

// Verify the message fields
Expect(data.Key).Should(Equal("patrick"))
Expect(data.MaxAmount).Should(Equal(5000))
Expect(data.MinAmount).Should(Equal(2))
consumer.Ack(msg)

consumer.Close()
Eventually(func(g Gomega) {
msg = receivePulsarMessageWithTimeout(pulsarConsumer, ctx)
g.Expect(msg).ShouldNot(BeNil())

var data pulsarTestSchema
err = json.Unmarshal(msg.Payload(), &data)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(data.Key).Should(Equal("patrick"))
g.Expect(data.MaxAmount).Should(Equal(5000))
g.Expect(data.MinAmount).Should(Equal(2))

g.Expect(pulsarConsumer.Ack(msg)).Should(Succeed())
}, "5s", "1s").Should(Succeed())
})
})
})

0 comments on commit 891d2fc

Please sign in to comment.