Skip to content

Commit

Permalink
Deprecate encoded connections (#1674)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Aug 15, 2024
1 parent c856753 commit 3633c34
Show file tree
Hide file tree
Showing 27 changed files with 524 additions and 646 deletions.
95 changes: 0 additions & 95 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,60 +136,6 @@ To find more information on `nats.go` JetStream API, visit
The service API (`micro`) allows you to [easily build NATS services](micro/README.md) The
services API is currently in beta release.

## Encoded Connections

```go

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
Name string
Address string
Age int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)
// ...
sub.Unsubscribe()

// Requests
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
c.Publish(reply, "I can help!")
})

// Close connection
c.Close();
```

## New Authentication (Nkeys and User Credentials)
This requires server with version >= 2.0.0

Expand Down Expand Up @@ -269,34 +215,6 @@ if err != nil {

```

## Using Go Channels (netchan)

```go
nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
Name string
Address string
Age int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh
```

## Wildcard Subscriptions

```go
Expand Down Expand Up @@ -463,19 +381,6 @@ msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
Message string `json:"message"`
}
type response struct {
Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)
```

## Backwards compatibility

In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines.
Expand Down
2 changes: 2 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (nc *Conn) FlushWithContext(ctx context.Context) error {
// RequestWithContext will create an Inbox and perform a Request
// using the provided cancellation context with the Inbox reply
// for the data v. A response will be decoded into the vPtr last parameter.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error {
if ctx == nil {
return ErrInvalidContext
Expand Down
34 changes: 34 additions & 0 deletions enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"github.com/nats-io/nats.go/encoders/builtin"
)

//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn

// Encoder interface is for all register encoders
//
// Deprecated: Encoded connections are no longer supported.
type Encoder interface {
Encode(subject string, v any) ([]byte, error)
Decode(subject string, data []byte, vPtr any) error
Expand All @@ -51,13 +55,17 @@ func init() {
// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
// a nats server and have an extendable encoder system that will encode and decode messages
// from raw Go types.
//
// Deprecated: Encoded connections are no longer supported.
type EncodedConn struct {
Conn *Conn
Enc Encoder
}

// NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
// encoder.
//
// Deprecated: Encoded connections are no longer supported.
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
if c == nil {
return nil, errors.New("nats: Nil Connection")
Expand All @@ -73,13 +81,17 @@ func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
}

// RegisterEncoder will register the encType with the given Encoder. Useful for customization.
//
// Deprecated: Encoded connections are no longer supported.
func RegisterEncoder(encType string, enc Encoder) {
encLock.Lock()
defer encLock.Unlock()
encMap[encType] = enc
}

// EncoderForType will return the registered Encoder for the encType.
//
// Deprecated: Encoded connections are no longer supported.
func EncoderForType(encType string) Encoder {
encLock.Lock()
defer encLock.Unlock()
Expand All @@ -88,6 +100,8 @@ func EncoderForType(encType string) Encoder {

// Publish publishes the data argument to the given subject. The data argument
// will be encoded using the associated encoder.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Publish(subject string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
Expand All @@ -99,6 +113,8 @@ func (c *EncodedConn) Publish(subject string, v any) error {
// PublishRequest will perform a Publish() expecting a response on the
// reply subject. Use Request() for automatically waiting for a response
// inline.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
Expand All @@ -110,6 +126,8 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v any) error {
// Request will create an Inbox and perform a Request() call
// with the Inbox reply for the data v. A response will be
// decoded into the vPtr Response.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Duration) error {
b, err := c.Enc.Encode(subject, v)
if err != nil {
Expand Down Expand Up @@ -150,6 +168,8 @@ func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Dura
// and demarshal it into the given struct, e.g. person.
// There are also variants where the callback wants either the subject, or the
// subject and the reply subject.
//
// Deprecated: Encoded connections are no longer supported.
type Handler any

// Dissect the cb Handler's signature
Expand All @@ -170,13 +190,17 @@ var emptyMsgType = reflect.TypeOf(&Msg{})
// Subscribe will create a subscription on the given subject and process incoming
// messages using the specified Handler. The Handler should be a func that matches
// a signature from the description of Handler from above.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
return c.subscribe(subject, _EMPTY_, cb)
}

// QueueSubscribe will create a queue subscription on the given subject and process
// incoming messages using the specified Handler. The Handler should be a func that
// matches a signature from the description of Handler from above.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
return c.subscribe(subject, queue, cb)
}
Expand Down Expand Up @@ -238,18 +262,24 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
}

// FlushTimeout allows a Flush operation to have an associated timeout.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
return c.Conn.FlushTimeout(timeout)
}

// Flush will perform a round trip to the server and return when it
// receives the internal reply.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Flush() error {
return c.Conn.Flush()
}

// Close will close the connection to the server. This call will release
// all blocking calls, such as Flush(), etc.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Close() {
c.Conn.Close()
}
Expand All @@ -259,11 +289,15 @@ func (c *EncodedConn) Close() {
// will be drained and can not publish any additional messages. Upon draining
// of the publishers, the connection will be closed. Use the ClosedCB()
// option to know when the connection has moved from draining to closed.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) Drain() error {
return c.Conn.Drain()
}

// LastError reports the last error encountered via the Connection.
//
// Deprecated: Encoded connections are no longer supported.
func (c *EncodedConn) LastError() error {
return c.Conn.LastError()
}
6 changes: 6 additions & 0 deletions encoders/builtin/default_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
// turn numbers into appropriate strings that can be decoded. It will also
// properly encoded and decode bools. If will encode a struct, but if you want
// to properly handle structures you should use JsonEncoder.
//
// Deprecated: Encoded connections are no longer supported.
type DefaultEncoder struct {
// Empty
}
Expand All @@ -35,6 +37,8 @@ var falseB = []byte("false")
var nilB = []byte("")

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) {
switch arg := v.(type) {
case string:
Expand All @@ -58,6 +62,8 @@ func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr any) error {
// Figure out what it's pointing to...
sData := *(*string)(unsafe.Pointer(&data))
Expand Down
6 changes: 6 additions & 0 deletions encoders/builtin/gob_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import (
// GobEncoder is a Go specific GOB Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/gob to Marshal
// and Unmarshal most types, including structs.
//
// Deprecated: Encoded connections are no longer supported.
type GobEncoder struct {
// Empty
}

// FIXME(dlc) - This could probably be more efficient.

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) {
b := new(bytes.Buffer)
enc := gob.NewEncoder(b)
Expand All @@ -38,6 +42,8 @@ func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (ge *GobEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
dec := gob.NewDecoder(bytes.NewBuffer(data))
err = dec.Decode(vPtr)
Expand Down
6 changes: 6 additions & 0 deletions encoders/builtin/json_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
// JsonEncoder is a JSON Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/json to Marshal
// and Unmarshal most types, including structs.
//
// Deprecated: Encoded connections are no longer supported.
type JsonEncoder struct {
// Empty
}

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) {
b, err := json.Marshal(v)
if err != nil {
Expand All @@ -35,6 +39,8 @@ func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (je *JsonEncoder) Decode(subject string, data []byte, vPtr any) (err error) {
switch arg := vPtr.(type) {
case *string:
Expand Down
8 changes: 8 additions & 0 deletions encoders/protobuf/protobuf_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"google.golang.org/protobuf/proto"
)

//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn

// Additional index for registered Encoders.
const (
PROTOBUF_ENCODER = "protobuf"
Expand All @@ -33,6 +35,8 @@ func init() {
// ProtobufEncoder is a protobuf implementation for EncodedConn
// This encoder will use the builtin protobuf lib to Marshal
// and Unmarshal structs.
//
// Deprecated: Encoded connections are no longer supported.
type ProtobufEncoder struct {
// Empty
}
Expand All @@ -43,6 +47,8 @@ var (
)

// Encode
//
// Deprecated: Encoded connections are no longer supported.
func (pb *ProtobufEncoder) Encode(subject string, v any) ([]byte, error) {
if v == nil {
return nil, nil
Expand All @@ -60,6 +66,8 @@ func (pb *ProtobufEncoder) Encode(subject string, v any) ([]byte, error) {
}

// Decode
//
// Deprecated: Encoded connections are no longer supported.
func (pb *ProtobufEncoder) Decode(subject string, data []byte, vPtr any) error {
if _, ok := vPtr.(*any); ok {
return nil
Expand Down
Loading

0 comments on commit 3633c34

Please sign in to comment.