Skip to content

Commit

Permalink
Add compat tests for micro
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Oct 17, 2023
1 parent e86edca commit 51872bc
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 11 deletions.
1 change: 0 additions & 1 deletion micro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
opts = append(opts, WithEndpointQueueGroup(config.QueueGroup))
}
if err := svc.AddEndpoint("default", config.Endpoint.Handler, opts...); err != nil {
svc.asyncDispatcher.close()
return nil, err
}
}
Expand Down
144 changes: 134 additions & 10 deletions test/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/micro"
)

type objectStepConfig[T any] struct {
Expand All @@ -42,7 +43,7 @@ type objectStepConfig[T any] struct {

func TestCompatibilityObjectStoreDefaultBucket(t *testing.T) {
t.Parallel()
nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestCompatibilityObjectStoreDefaultBucket(t *testing.T) {

func TestCompatibilityObjectStoreCustomBucket(t *testing.T) {
t.Parallel()
nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestCompatibilityObjectStoreGetObject(t *testing.T) {
Object string `json:"object"`
}

nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestCompatibilityObjectStoreGetObject(t *testing.T) {
func TestCompatibilityObjectStorePutObject(t *testing.T) {
t.Parallel()

nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestCompatibilityObjectStorePutObject(t *testing.T) {
func TestCompatibilityObjectStoreUpdateMetadata(t *testing.T) {
t.Parallel()

nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -244,7 +245,7 @@ func TestCompatibilityObjectStoreWatch(t *testing.T) {
Object string `json:"object"`
}

nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -314,7 +315,7 @@ func TestCompatibilityObjectStoreWatchUpdates(t *testing.T) {
Object string `json:"object"`
}

nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -362,7 +363,7 @@ func TestCompatibilityObjectStoreGetLink(t *testing.T) {
Object string `json:"object"`
}

nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestCompatibilityObjectStorePutLink(t *testing.T) {
LinkName string `json:"link_name"`
}

nc, js := connect(t)
nc, js := connectJS(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
Expand Down Expand Up @@ -465,7 +466,7 @@ func validateTestResult(t *testing.T, sub *nats.Subscription) {
}
}

func connect(t *testing.T) (*nats.Conn, nats.JetStreamContext) {
func connect(t *testing.T) *nats.Conn {
t.Helper()
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
Expand All @@ -475,9 +476,132 @@ func connect(t *testing.T) (*nats.Conn, nats.JetStreamContext) {
if err != nil {
t.Fatalf("Error connecting to NATS: %v", err)
}
return nc
}

func connectJS(t *testing.T) (*nats.Conn, nats.JetStreamContext) {
nc := connect(t)
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Error getting JetStream context: %v", err)
}
return nc, js
}

type serviceStepConfig[T any] struct {
Suite string `json:"suite"`
Test string `json:"test"`
Command string `json:"command"`
Config T `json:"config"`
}

func TestService(t *testing.T) {
t.Parallel()
nc := connect(t)
defer nc.Close()

type groupConfig struct {
Name string `json:"name"`
QueueGroup string `json:"queue_group"`
}

type endpointConfig struct {
micro.EndpointConfig
Name string `json:"name"`
Group string `json:"group"`
}

type config struct {
micro.Config
Groups []groupConfig `json:"groups"`
Endpoints []endpointConfig `json:"endpoints"`
}

echoHandler := micro.HandlerFunc(func(req micro.Request) {
req.Respond(req.Data())
})

errHandler := micro.HandlerFunc(func(req micro.Request) {
req.Error("500", "handler error", nil)
})

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.service.service.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

// 1. Get service and endpoint configs
msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}

var cfg serviceStepConfig[*config]
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}

var services []micro.Service
svcCfg := cfg.Config
svc, err := micro.AddService(nc, svcCfg.Config)
if err != nil {
t.Fatalf("Error adding service: %v", err)
}
groups := make(map[string]micro.Group)
for _, groupCfg := range svcCfg.Groups {
opts := []micro.GroupOpt{}
if groupCfg.QueueGroup != "" {
opts = append(opts, micro.WithGroupQueueGroup(groupCfg.QueueGroup))
}
groups[groupCfg.Name] = svc.AddGroup(groupCfg.Name, opts...)
}
for _, endpointCfg := range svcCfg.Endpoints {
opts := []micro.EndpointOpt{
micro.WithEndpointSubject(endpointCfg.Subject),
}
if endpointCfg.QueueGroup != "" {
opts = append(opts, micro.WithEndpointQueueGroup(endpointCfg.QueueGroup))
}
if endpointCfg.Metadata != nil {
opts = append(opts, micro.WithEndpointMetadata(endpointCfg.Metadata))
}
handler := echoHandler
if endpointCfg.Name == "faulty" {
handler = errHandler
}
if endpointCfg.Group != "" {
g := groups[endpointCfg.Group]
if g == nil {
t.Fatalf("Group %q not found", endpointCfg.Group)
}
if err := g.AddEndpoint(endpointCfg.Name, handler, opts...); err != nil {
t.Fatalf("Error adding endpoint: %v", err)
}
} else {
if err := svc.AddEndpoint(endpointCfg.Name, handler, opts...); err != nil {
t.Fatalf("Error adding endpoint: %v", err)
}
}
}
services = append(services, svc)

if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}

// 2. Stop services
msg, err = sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
for _, svc := range services {
svc.Stop()
}
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}

validateTestResult(t, sub)
}

0 comments on commit 51872bc

Please sign in to comment.