Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate encoded connections #1674

Merged
merged 4 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 0 additions & 95 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,60 +134,6 @@ To find more information on `nats.go` JetStream API, visit
The service API (`micro`) allows you to [easily build NATS services](micro/README.md) The
services API is currently in beta release.

## Encoded Connections

```go

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
Name string
Address string
Age int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)
// ...
sub.Unsubscribe()

// Requests
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
c.Publish(reply, "I can help!")
})

// Close connection
c.Close();
```

## New Authentication (Nkeys and User Credentials)
This requires server with version >= 2.0.0

Expand Down Expand Up @@ -267,34 +213,6 @@ if err != nil {

```

## Using Go Channels (netchan)

```go
nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
Name string
Address string
Age int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh
```

## Wildcard Subscriptions

```go
Expand Down Expand Up @@ -461,19 +379,6 @@ msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
Message string `json:"message"`
}
type response struct {
Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)
```

## Backwards compatibility

In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines.
Expand Down
2 changes: 2 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (nc *Conn) FlushWithContext(ctx context.Context) error {
// RequestWithContext will create an Inbox and perform a Request
// using the provided cancellation context with the Inbox reply
// for the data v. A response will be decoded into the vPtr last parameter.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error {
if ctx == nil {
return ErrInvalidContext
Expand Down
34 changes: 34 additions & 0 deletions enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"github.com/nats-io/nats.go/encoders/builtin"
)

//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn

// Encoder interface is for all register encoders
//
// Deprecated: Encoded connections are no longer supported.
type Encoder interface {
Encode(subject string, v any) ([]byte, error)
Decode(subject string, data []byte, vPtr any) error
Expand All @@ -51,13 +55,17 @@ func init() {
// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
// a nats server and have an extendable encoder system that will encode and decode messages
// from raw Go types.
//
// Deprecated: Encoded connections are no longer supported.
type EncodedConn struct {
Conn *Conn
Enc Encoder
}

// NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
// encoder.
//
// Deprecated: Encoded connections are no longer supported.
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
if c == nil {
return nil, errors.New("nats: Nil Connection")
Expand All @@ -73,13 +81,17 @@ func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
}

// RegisterEncoder will register the encType with the given Encoder. Useful for customization.
//
// Deprecated: Encoded connections are no longer supported.
func RegisterEncoder(encType string, enc Encoder) {
encLock.Lock()
defer encLock.Unlock()
encMap[encType] = enc
}

// EncoderForType will return the registered Encoder for the encType.
//
// Deprecated: Encoded connections are no longer supported.
func EncoderForType(encType string) Encoder {
encLock.Lock()
defer encLock.Unlock()
Expand All @@ -88,6 +100,8 @@ func EncoderForType(encType string) Encoder {

// Publish publishes the data argument to the given subject. The data argument
// will be encoded using the associated encoder.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Publish(subject string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
Expand All @@ -99,6 +113,8 @@ func (c *EncodedConn) Publish(subject string, v any) error {
// PublishRequest will perform a Publish() expecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
Expand All @@ -110,6 +126,8 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
// Request will create an Inbox and perform a Request() call
// with the Inbox reply for the data v. A response will be
// decoded into the vPtr Response.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Duration) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
Expand Down Expand Up @@ -150,6 +168,8 @@ func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Dura
// and demarshal it into the given struct, e.g. person.
// There are also variants where the callback wants either the subject, or the
// subject and the reply subject.
//
// Deprecated: Encoded connections are no longer supported.
type Handler any

// Dissect the cb Handler's signature
Expand All @@ -170,13 +190,17 @@ var emptyMsgType = reflect.TypeOf(&Msg{})
// Subscribe will create a subscription on the given subject and process incoming
// messages using the specified Handler. The Handler should be a func that matches
// a signature from the description of Handler from above.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
return c.subscribe(subject, _EMPTY_, cb)
}

// QueueSubscribe will create a queue subscription on the given subject and process
// incoming messages using the specified Handler. The Handler should be a func that
// matches a signature from the description of Handler from above.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
return c.subscribe(subject, queue, cb)
}
Expand Down Expand Up @@ -238,18 +262,24 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
}

// FlushTimeout allows a Flush operation to have an associated timeout.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
return c.Conn.FlushTimeout(timeout)
}

// Flush will perform a round trip to the server and return when it
// receives the internal reply.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Flush() error {
return c.Conn.Flush()
}

// Close will close the connection to the server. This call will release
// all blocking calls, such as Flush(), etc.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Close() {
c.Conn.Close()
}
Expand All @@ -259,11 +289,15 @@ func (c *EncodedConn) Close() {
// will be drained and can not publish any additional messages. Upon draining
// of the publishers, the connection will be closed. Use the ClosedCB()
// option to know when the connection has moved from draining to closed.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Drain() error {
return c.Conn.Drain()
}

// LastError reports the last error encountered via the Connection.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) LastError() error {
return c.Conn.LastError()
}
6 changes: 6 additions & 0 deletions encoders/builtin/default_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
// turn numbers into appropriate strings that can be decoded. It will also
// properly encoded and decode bools. If will encode a struct, but if you want
// to properly handle structures you should use JsonEncoder.
//
// Deprecated: Encoded connections are no longer supported.
type DefaultEncoder struct {
// Empty
}
Expand All @@ -35,6 +37,8 @@ var falseB = []byte("false")
var nilB = []byte("")

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) {
switch arg := v.(type) {
case string:
Expand All @@ -58,6 +62,8 @@ func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr any) error {
// Figure out what it's pointing to...
sData := *(*string)(unsafe.Pointer(&data))
Expand Down
6 changes: 6 additions & 0 deletions encoders/builtin/gob_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import (
// GobEncoder is a Go specific GOB Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/gob to Marshal
// and Unmarshal most types, including structs.
//
// Deprecated: Encoded connections are no longer supported.
type GobEncoder struct {
// Empty
}

// FIXME(dlc) - This could probably be more efficient.

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) {
b := new(bytes.Buffer)
enc := gob.NewEncoder(b)
Expand All @@ -38,6 +42,8 @@ func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (ge *GobEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
dec := gob.NewDecoder(bytes.NewBuffer(data))
err = dec.Decode(vPtr)
Expand Down
6 changes: 6 additions & 0 deletions encoders/builtin/json_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
// JsonEncoder is a JSON Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/json to Marshal
// and Unmarshal most types, including structs.
//
// Deprecated: Encoded connections are no longer supported.
type JsonEncoder struct {
// Empty
}

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) {
b, err := json.Marshal(v)
if err != nil {
Expand All @@ -35,6 +39,8 @@ func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (je *JsonEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
switch arg := vPtr.(type) {
case *string:
Expand Down
8 changes: 8 additions & 0 deletions encoders/protobuf/protobuf_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"google.golang.org/protobuf/proto"
)

//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn

// Additional index for registered Encoders.
const (
PROTOBUF_ENCODER = "protobuf"
Expand All @@ -33,6 +35,8 @@ func init() {
// ProtobufEncoder is a protobuf implementation for EncodedConn
// This encoder will use the builtin protobuf lib to Marshal
// and Unmarshal structs.
//
// Deprecated: Encoded connections are no longer supported.
type ProtobufEncoder struct {
// Empty
}
Expand All @@ -43,6 +47,8 @@ var (
)

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (pb *ProtobufEncoder) Encode(subject string, v any) ([]byte, error) {
if v == nil {
return nil, nil
Expand All @@ -60,6 +66,8 @@ func (pb *ProtobufEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (pb *ProtobufEncoder) Decode(subject string, data []byte, vPtr any) error {
if _, ok := vPtr.(*any); ok {
return nil
Expand Down
Loading