Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS transport #680

Merged
merged 1 commit into from
Apr 27, 2018
Merged

NATS transport #680

merged 1 commit into from
Apr 27, 2018

Conversation

kirooha
Copy link
Contributor

@kirooha kirooha commented Mar 15, 2018

The PR contains transport for NATS (messaging system, https://nats.io)
I saw a request with the similar functional #623
Since the PR is still open, I want to suggest my PR.

@kirooha kirooha changed the title Transport for nats NATS transport Mar 15, 2018
@peterbourgon
Copy link
Member

Oh, cool.. ! I would want to also see Publisher type (corresponding to Client), can that be added?

@kirooha
Copy link
Contributor Author

kirooha commented Mar 15, 2018

yes, sure! I am adding

"flag"

"github.com/go-kit/kit/endpoint"
natstransport "github.com/kirooha/kit/transport/nats"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github.com/kirooha/kit/transport/nats - maybe need to change to github.com/go-kit/kit/transport/nats

}

if !p.async {
resp, err := p.publisher.Request(p.subject, msg.Data, p.timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you already have a context you can use RequestWithContext instead and remove the timeout field.


}

if p.reply == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use PublishMsg and than remove the rest of the method.

type DecodeRequestFunc func(context.Context, *nats.Msg) (request interface{}, err error)


// EncodeRequestFunc encodes the passed request object into the HTTP request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment still talks about HTTP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, great! thanks a lot! I didn't hope to so fast review) Actually, it was a draft and I supposed to modify comments and test code. One more time, thanks a lot! I will fix your remarks as soon as possible.

// subscribers, after invoking the endpoint but prior to publishing a reply.
type SubscriberResponseFunc func(context.Context, *nats.Conn) context.Context

// ClientResponseFunc may take information from an HTTP request and make the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP

// ServeMsg provides nats.MsgHandler.
func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
return func(msg *nats.Msg) {
ctx := context.Background()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context is never cancelled

ctx = f(ctx, &msg)
}

if !p.async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endpoint tries to handle both simple publish operations and requests which feels kinda odd. Also the !async case ignores the p.reply topic.

I suggest to remove the async case and the custom reply topic for know and only do a simple Request(WithContext). We can always add support for messages without reply later through options or a different type.

reply: reply,
enc: enc,
dec: dec,
before: []RequestFunc{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to initialize this, Go already handles nil correct for all the cases needed.

// ServeMsg provides nats.MsgHandler.
func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
return func(msg *nats.Msg) {
ctx := context.Background()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx should be cancelled when the handler returns

return nil, err
}

return nil, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The funcs in p.after should probably still be called here and below with a nil *nats.Msg

@kirooha
Copy link
Contributor Author

kirooha commented Mar 22, 2018

@nussjustin all remarks are fixed

Copy link
Contributor

@nussjustin nussjustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 small comment, but other than that the API looks good to me.

This needs some tests though ;-)

For testing publishers and subscribers you can start real NATS servers via the gnatsd/server package (see Server.Start and Server.ReadyForConnections).

// SubscriberRequestFunc may take information from a publisher request and put it into a
// request context. In Subscribers, RequestFuncs are executed prior to invoking the
// endpoint.
type SubscriberRequestFunc func(context.Context, *nats.Msg) context.Context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, you were right. This should be just RequestFunc. I forgot that this is used for both Publishers and Subscribers. That's what I get for not reviewing my comments before requesting changes...

@kirooha
Copy link
Contributor Author

kirooha commented Apr 10, 2018

there's some weird behavior of ci in this pr. I did 3 force push without any changes (amend commit that added blank lines). 2 builds were failed and the latest is succeeded. I mean ci/circleci and continuous-integration/travis-ci/pr

@kirooha
Copy link
Contributor Author

kirooha commented Apr 11, 2018

@nussjustin tests have been added

"strings"

"github.com/nats-io/go-nats"
natstransport "github.com/kirooha/kit/transport/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong import path

"github.com/nats-io/go-nats"
"github.com/nats-io/gnatsd/server"

natstransport "github.com/kirooha/kit/transport/nats"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wrong

@kirooha
Copy link
Contributor Author

kirooha commented Apr 15, 2018

@nussjustin fixed wrong paths, squashed commits to the only one with a more detailed comment. do you have any notes?

@nussjustin
Copy link
Contributor

Could you please add a test for PublisherTimeout? From what I see it's the only publisher option that's not tested right now (as reported by go tool cover).

Otherwise LGTM so far.

@kirooha
Copy link
Contributor Author

kirooha commented Apr 15, 2018

done

defer nc.Close()

sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
ch := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're leaking the goroutine here. Even if this is just a test it shouldn't leak goroutines. Just create the channel before the QueueSubscribe and close it after the test, like this:

ch := make(chan struct{})
defer close(ch)

sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
	<-ch
})

NATS is an open-source, cloud-native messaging system
(https://www.nats.io).
The functional provides API that lets one works with NATS
in a similar way as HTTP.

- nats.MsgHandler could be used in queue or simple subscriber
- Sync publisher
@kirooha
Copy link
Contributor Author

kirooha commented Apr 16, 2018

thank you! awesome note, fixed

Copy link
Contributor

@nussjustin nussjustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything obviously wrong / bad right now sooo... passing on the review torch to @peterbourgon and co.

@peterbourgon
Copy link
Member

Great, thank you. Give me a week or so; I'm on a work trip and have a vacation immediately afterwards.

@peterbourgon
Copy link
Member

Yeah I really don't have any nits to pick here, this LGTM! Thanks very much for the hard work!

@peterbourgon peterbourgon merged commit 62e6b81 into go-kit:master Apr 27, 2018
jamesgist pushed a commit to jamesgist/kit that referenced this pull request Nov 1, 2024
NATS is an open-source, cloud-native messaging system
(https://www.nats.io).
The functional provides API that lets one works with NATS
in a similar way as HTTP.

- nats.MsgHandler could be used in queue or simple subscriber
- Sync publisher
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants