diff --git a/micro/request.go b/micro/request.go index eb4c0b88d..380f4945c 100644 --- a/micro/request.go +++ b/micro/request.go @@ -71,6 +71,11 @@ type ( msg *nats.Msg respondError error } + + serviceError struct { + Code string `json:"code"` + Description string `json:"description"` + } ) var ( @@ -144,6 +149,11 @@ func (r *request) Error(code, description string, data []byte, opts ...RespondOp r.respondError = err return err } + r.respondError = &serviceError{ + Code: code, + Description: description, + } + return nil } @@ -187,3 +197,7 @@ func (h Headers) Get(key string) string { func (h Headers) Values(key string) []string { return nats.Header(h).Values(key) } + +func (e *serviceError) Error() string { + return fmt.Sprintf("%s:%s", e.Code, e.Description) +} diff --git a/micro/service.go b/micro/service.go index a1067c835..d3a8fcdf2 100644 --- a/micro/service.go +++ b/micro/service.go @@ -351,7 +351,6 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { opts = append(opts, WithEndpointQueueGroup(config.QueueGroup)) } if err := svc.AddEndpoint("default", config.Endpoint.Handler, opts...); err != nil { - svc.asyncDispatcher.close() return nil, err } } @@ -675,8 +674,8 @@ func (s *service) Stop() error { s.stopped = true if s.DoneHandler != nil { s.asyncDispatcher.push(func() { s.DoneHandler(s) }) - s.asyncDispatcher.close() } + s.asyncDispatcher.close() return nil } diff --git a/test/compat_test.go b/test/compat_test.go index 928787899..4fd3bcce3 100644 --- a/test/compat_test.go +++ b/test/compat_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/micro" ) type objectStepConfig[T any] struct { @@ -42,7 +43,7 @@ type objectStepConfig[T any] struct { func TestCompatibilityObjectStoreDefaultBucket(t *testing.T) { t.Parallel() - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -73,7 +74,7 @@ func TestCompatibilityObjectStoreDefaultBucket(t *testing.T) { func TestCompatibilityObjectStoreCustomBucket(t *testing.T) { t.Parallel() - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -112,7 +113,7 @@ func TestCompatibilityObjectStoreGetObject(t *testing.T) { Object string `json:"object"` } - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -159,7 +160,7 @@ func TestCompatibilityObjectStoreGetObject(t *testing.T) { func TestCompatibilityObjectStorePutObject(t *testing.T) { t.Parallel() - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -204,7 +205,7 @@ func TestCompatibilityObjectStorePutObject(t *testing.T) { func TestCompatibilityObjectStoreUpdateMetadata(t *testing.T) { t.Parallel() - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -244,7 +245,7 @@ func TestCompatibilityObjectStoreWatch(t *testing.T) { Object string `json:"object"` } - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -314,7 +315,7 @@ func TestCompatibilityObjectStoreWatchUpdates(t *testing.T) { Object string `json:"object"` } - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -362,7 +363,7 @@ func TestCompatibilityObjectStoreGetLink(t *testing.T) { Object string `json:"object"` } - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -415,7 +416,7 @@ func TestCompatibilityObjectStorePutLink(t *testing.T) { LinkName string `json:"link_name"` } - nc, js := connect(t) + nc, js := connectJS(t) defer nc.Close() // setup subscription on which tester will be sending requests @@ -465,7 +466,7 @@ func validateTestResult(t *testing.T, sub *nats.Subscription) { } } -func connect(t *testing.T) (*nats.Conn, nats.JetStreamContext) { +func connect(t *testing.T) *nats.Conn { t.Helper() natsURL := os.Getenv("NATS_URL") if natsURL == "" { @@ -475,9 +476,135 @@ func connect(t *testing.T) (*nats.Conn, nats.JetStreamContext) { if err != nil { t.Fatalf("Error connecting to NATS: %v", err) } + return nc +} + +func connectJS(t *testing.T) (*nats.Conn, nats.JetStreamContext) { + nc := connect(t) js, err := nc.JetStream() if err != nil { t.Fatalf("Error getting JetStream context: %v", err) } return nc, js } + +type serviceStepConfig[T any] struct { + Suite string `json:"suite"` + Test string `json:"test"` + Command string `json:"command"` + Config T `json:"config"` +} + +func TestTestCompatibilityService(t *testing.T) { + t.Parallel() + nc := connect(t) + defer nc.Close() + + type groupConfig struct { + Name string `json:"name"` + QueueGroup string `json:"queue_group"` + } + + type endpointConfig struct { + micro.EndpointConfig + Name string `json:"name"` + Group string `json:"group"` + } + + type config struct { + micro.Config + Groups []groupConfig `json:"groups"` + Endpoints []endpointConfig `json:"endpoints"` + } + + echoHandler := micro.HandlerFunc(func(req micro.Request) { + req.Respond(req.Data()) + }) + + errHandler := micro.HandlerFunc(func(req micro.Request) { + req.Error("500", "handler error", nil) + }) + + // setup subscription on which tester will be sending requests + sub, err := nc.SubscribeSync("tests.service.core.>") + if err != nil { + t.Fatalf("Error subscribing to test subject: %v", err) + } + defer sub.Unsubscribe() + + // 1. Get service and endpoint configs + msg, err := sub.NextMsg(1 * time.Hour) + if err != nil { + t.Fatalf("Error getting message: %v", err) + } + + var cfg serviceStepConfig[*config] + if err := json.Unmarshal(msg.Data, &cfg); err != nil { + t.Fatalf("Error unmarshalling message: %v", err) + } + + var services []micro.Service + svcCfg := cfg.Config + svcCfg.StatsHandler = func(e *micro.Endpoint) any { + return map[string]string{"endpoint": e.Name} + } + svc, err := micro.AddService(nc, svcCfg.Config) + if err != nil { + t.Fatalf("Error adding service: %v", err) + } + groups := make(map[string]micro.Group) + for _, groupCfg := range svcCfg.Groups { + opts := []micro.GroupOpt{} + if groupCfg.QueueGroup != "" { + opts = append(opts, micro.WithGroupQueueGroup(groupCfg.QueueGroup)) + } + groups[groupCfg.Name] = svc.AddGroup(groupCfg.Name, opts...) + } + for _, endpointCfg := range svcCfg.Endpoints { + opts := []micro.EndpointOpt{ + micro.WithEndpointSubject(endpointCfg.Subject), + } + if endpointCfg.QueueGroup != "" { + opts = append(opts, micro.WithEndpointQueueGroup(endpointCfg.QueueGroup)) + } + if endpointCfg.Metadata != nil { + opts = append(opts, micro.WithEndpointMetadata(endpointCfg.Metadata)) + } + handler := echoHandler + if endpointCfg.Name == "faulty" { + handler = errHandler + } + if endpointCfg.Group != "" { + g := groups[endpointCfg.Group] + if g == nil { + t.Fatalf("Group %q not found", endpointCfg.Group) + } + if err := g.AddEndpoint(endpointCfg.Name, handler, opts...); err != nil { + t.Fatalf("Error adding endpoint: %v", err) + } + } else { + if err := svc.AddEndpoint(endpointCfg.Name, handler, opts...); err != nil { + t.Fatalf("Error adding endpoint: %v", err) + } + } + } + services = append(services, svc) + + if err := msg.Respond(nil); err != nil { + t.Fatalf("Error responding to message: %v", err) + } + + // 2. Stop services + msg, err = sub.NextMsg(1 * time.Hour) + if err != nil { + t.Fatalf("Error getting message: %v", err) + } + for _, svc := range services { + svc.Stop() + } + if err := msg.Respond(nil); err != nil { + t.Fatalf("Error responding to message: %v", err) + } + + validateTestResult(t, sub) +} diff --git a/test/configs/docker/Dockerfile b/test/configs/docker/Dockerfile index 450acedea..037430d83 100644 --- a/test/configs/docker/Dockerfile +++ b/test/configs/docker/Dockerfile @@ -4,4 +4,4 @@ COPY . /usr/src/nats.go RUN go mod tidy -modfile go_test.mod RUN go test -run TestNone -modfile go_test.mod -tags compat ./test/... ENV NATS_URL=localhost:4222 -CMD go test -v -run TestCompatibility -modfile go_test.mod -tags compat ./test/... -count 1 -parallel 10 +ENTRYPOINT ["go", "test", "-v", "-modfile", "go_test.mod", "-tags", "compat", "./test/...", "-count", "1", "-parallel", "10", "-run"]