Skip to content

Commit

Permalink
feat(client/orb,server/*): Implement metadata transfer and server mid…
Browse files Browse the repository at this point in the history
…dleware. Fixes #15.
  • Loading branch information
jochumdev committed Sep 29, 2024
1 parent 17c0b37 commit b249295
Show file tree
Hide file tree
Showing 59 changed files with 1,156 additions and 842 deletions.
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ linters:
# Deprecated
- exportloopref

# Doesn't work for us
- contextcheck

issues:
# List of regexps of issue texts to exclude, empty list by default.
# But independently from this option we use default exclude patterns,
Expand Down
54 changes: 26 additions & 28 deletions client/orb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,25 +211,24 @@ func (c *Client) Call(
) (resp *client.RawResponse, err error) {
co := c.makeOptions(opts...)

// Wrap middlewares
call := c.call
for _, m := range c.middlewares {
call = m.Call(call)
}

return call(ctx, req, co)
}
// Add metadata to the context.
ctx = metadata.EnsureIncoming(ctx)
ctx = metadata.EnsureOutgoing(ctx)

func (c *Client) call(ctx context.Context, req *client.Request[any, any], opts *client.CallOptions) (resp *client.RawResponse, err error) {
transport, err := c.transportForReq(ctx, req, opts)
transport, err := c.transportForReq(ctx, req, co)
if err != nil {
return nil, err
}

// Add metadata to the context.
ctx = metadata.Ensure(ctx)
// Wrap middlewares
call := func(ctx context.Context, req *client.Request[any, any], opts *client.CallOptions) (*client.RawResponse, error) {
return transport.Call(ctx, req, opts)
}
for _, m := range c.middlewares {
call = m.Call(call)
}

return transport.Call(ctx, req, opts)
return call(ctx, req, co)
}

// CallNoCodec does the actual call without codecs.
Expand All @@ -241,25 +240,24 @@ func (c *Client) CallNoCodec(
) error {
co := c.makeOptions(opts...)

// Wrap middlewares
call := c.callNoCodec
for _, m := range c.middlewares {
call = m.CallNoCodec(call)
}

// Add metadata to the context.
ctx = metadata.Ensure(ctx)

return call(ctx, req, result, co)
}
ctx = metadata.EnsureIncoming(ctx)
ctx = metadata.EnsureOutgoing(ctx)

func (c *Client) callNoCodec(ctx context.Context, req *client.Request[any, any], result any, opts *client.CallOptions) (err error) {
transport, err := c.transportForReq(ctx, req, opts)
transport, err := c.transportForReq(ctx, req, co)
if err != nil {
return err
}

return transport.CallNoCodec(ctx, req, result, opts)
// Wrap middlewares
call := func(ctx context.Context, req *client.Request[any, any], result any, opts *client.CallOptions) error {
return transport.CallNoCodec(ctx, req, result, opts)
}
for _, m := range c.middlewares {
call = m.CallNoCodec(call)
}

return call(ctx, req, result, co)
}

// New creates a new orb client. This functions should rarely be called manually.
Expand Down Expand Up @@ -293,8 +291,8 @@ func New(cfg Config, log log.Logger, registry registry.Type) *Client {
}
}

// ProvideClient is the wire provider for client.
func ProvideClient(
// Provide is the wire provider for client.
func Provide(
name types.ServiceName,
data types.ConfigData,
logger log.Logger,
Expand Down
2 changes: 1 addition & 1 deletion client/orb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
const Name = "orb"

func init() {
client.Register(Name, ProvideClient)
client.Register(Name, Provide)
}

// Config is the config for the orb client.
Expand Down
28 changes: 15 additions & 13 deletions client/orb/transport/basehertz/basehertz.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"slices"

hclient "github.com/cloudwego/hertz/pkg/app/client"
"github.com/cloudwego/hertz/pkg/protocol"
Expand All @@ -19,8 +19,8 @@ import (
"github.com/go-orb/plugins/client/orb"
)

// orbHeader is the prefix for every orb HTTP header.
const orbHeader = "__orb-"
//nolint:gochecknoglobals
var stdHeaders = []string{"Content-Length", "Content-Type", "Date", "Server"}

var _ (orb.Transport) = (*Transport)(nil)

Expand Down Expand Up @@ -87,10 +87,10 @@ func (t *Transport) Call(ctx context.Context, req *client.Request[any, any], opt
hReq.SetRequestURI(fmt.Sprintf("%s://%s/%s", t.scheme, node.Address, req.Endpoint()))

// Set metadata key=value to request headers.
md, ok := metadata.From(ctx)
md, ok := metadata.OutgoingFrom(ctx)
if ok {
for name, value := range md {
hReq.Header.Set(orbHeader+name, value)
hReq.Header.Set(name, value)
}
}

Expand Down Expand Up @@ -139,21 +139,23 @@ func (t *Transport) call2(
res := &client.RawResponse{
ContentType: hRes.Header.Get("Content-Type"),
Body: &hresBodyCloserWrapper{buff: buff},
Metadata: make(metadata.Metadata),
}

if hRes.StatusCode() != consts.StatusOK {
return res, orberrors.NewHTTP(hRes.StatusCode())
}

// Copy headers to the RawResponse.
for _, v := range hRes.Header.GetHeaders() {
k := string(v.GetKey())
if !strings.HasPrefix(strings.ToLower(k), orbHeader) {
continue
}
if opts.Headers != nil {
for _, v := range hRes.Header.GetHeaders() {
k := string(v.GetKey())

// Skip std headers.
if slices.Contains(stdHeaders, k) {
continue
}

res.Metadata[k[len(orbHeader):]] = string(v.GetValue())
opts.Headers[k] = string(v.GetValue())
}
}

return res, nil
Expand Down
43 changes: 23 additions & 20 deletions client/orb/transport/basehttp/basehttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"fmt"
"io"
"net/http"
"slices"
"strconv"
"strings"

"github.com/go-orb/go-orb/client"
"github.com/go-orb/go-orb/codecs"
Expand All @@ -20,8 +20,8 @@ import (
"github.com/go-orb/plugins/client/orb"
)

// orbHeader is the prefix for every orb HTTP header.
const orbHeader = "__orb-"
//nolint:gochecknoglobals
var stdHeaders = []string{"Content-Length", "Content-Type", "Date", "Server"}

var _ (orb.Transport) = (*Transport)(nil)

Expand Down Expand Up @@ -95,17 +95,17 @@ func (t *Transport) Call(ctx context.Context, req *client.Request[any, any], opt
hReq.Header.Set("Accept", opts.ContentType)

// Set metadata key=value to request headers.
md, ok := metadata.From(ctx)
md, ok := metadata.OutgoingFrom(ctx)
if ok {
for name, value := range md {
hReq.Header.Set(orbHeader+name, value)
hReq.Header.Set(name, value)
}
}

return t.call2(hReq)
return t.call2(hReq, opts)
}

func (t *Transport) call2(hReq *http.Request) (*client.RawResponse, error) {
func (t *Transport) call2(hReq *http.Request, opts *client.CallOptions) (*client.RawResponse, error) {
// Run the request.
resp, err := t.hclient.Do(hReq)
if err != nil {
Expand All @@ -128,23 +128,26 @@ func (t *Transport) call2(hReq *http.Request) (*client.RawResponse, error) {
res := &client.RawResponse{
ContentType: resp.Header.Get("Content-Type"),
Body: buff,
Metadata: make(metadata.Metadata),
}

// Copy headers to the RawResponse.
for k, v := range resp.Header {
if !strings.HasPrefix(strings.ToLower(k), orbHeader) {
continue
}
if opts.Headers != nil {
md := opts.Headers

// Copy headers to opts.Header
for k, v := range resp.Header {
// Skip std headers.
if slices.Contains(stdHeaders, k) {
continue
}

k = k[len(orbHeader):]
if len(v) == 1 {
md[k] = v[0]
} else {
md[k] = v[0]

if len(v) == 1 {
res.Metadata[k] = v[0]
} else {
res.Metadata[k] = v[0]
for i := 1; i < len(v); i++ {
res.Metadata[k+"-"+strconv.Itoa(i)] = v[i]
for i := 1; i < len(v); i++ {
md[k+"-"+strconv.Itoa(i)] = v[i]
}
}
}
}
Expand Down
26 changes: 24 additions & 2 deletions client/orb/transport/drpc/drpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import (

"storj.io/drpc"
"storj.io/drpc/drpcconn"
"storj.io/drpc/drpcerr"
"storj.io/drpc/drpcmetadata"
"storj.io/drpc/drpcpool"

"google.golang.org/protobuf/proto"

"github.com/go-orb/go-orb/client"
"github.com/go-orb/go-orb/log"
"github.com/go-orb/go-orb/util/metadata"
"github.com/go-orb/go-orb/util/orberrors"
"github.com/go-orb/plugins/client/orb"
"github.com/go-orb/plugins/server/drpc/message"
)

var _ drpc.Encoding = (*encoder)(nil)
Expand Down Expand Up @@ -98,14 +102,32 @@ func (t *Transport) CallNoCodec(ctx context.Context, req *client.Request[any, an

conn := t.pool.Get(ctx, node.Address, dial)

// Add metadata to drpc.
md, ok := metadata.OutgoingFrom(ctx)
if ok {
ctx = drpcmetadata.AddPairs(ctx, md)
}

ctx, cancel := context.WithDeadline(ctx, time.Now().Add(opts.RequestTimeout))
defer cancel()

err = conn.Invoke(ctx, "/"+req.Endpoint(), &t.encoder, req.Request(), result)
if err != nil {
mdResult := &message.Response{}
if err := conn.Invoke(ctx, "/"+req.Endpoint(), &t.encoder, req.Request(), mdResult); err != nil {
return orberrors.New(int(drpcerr.Code(err)), err.Error()) //nolint:gosec
}

// Unmarshal the result.
if err := mdResult.GetData().UnmarshalTo(result.(proto.Message)); err != nil {
return orberrors.From(err)
}

// Retrieve metadata from drpc.
if opts.Headers != nil {
for k, v := range mdResult.GetMetadata() {
opts.Headers[k] = v
}
}

err = conn.Close()
if err != nil {
return orberrors.From(err)
Expand Down
2 changes: 1 addition & 1 deletion client/orb/transport/grpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ require (
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
)
24 changes: 23 additions & 1 deletion client/orb/transport/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/go-orb/go-orb/client"
"github.com/go-orb/go-orb/log"
"github.com/go-orb/go-orb/util/metadata"
"github.com/go-orb/go-orb/util/orberrors"
"github.com/go-orb/plugins/client/orb"
"github.com/go-orb/plugins/client/orb/transport/grpc/pool"
Expand Down Expand Up @@ -59,6 +61,8 @@ func (t *Transport) Call(_ context.Context, _ *client.Request[any, any], _ *clie
}

// CallNoCodec does the actual rpc call to the server.
//
//nolint:funlen
func (t *Transport) CallNoCodec(ctx context.Context, req *client.Request[any, any], result any, opts *client.CallOptions) error {
node, err := req.Node(ctx, opts)
if err != nil {
Expand Down Expand Up @@ -92,10 +96,22 @@ func (t *Transport) CallNoCodec(ctx context.Context, req *client.Request[any, an
return orberrors.From(err)
}

// Append go-orb metadata to grpc.
if md, ok := metadata.OutgoingFrom(ctx); ok {
kv := []string{}
for k, v := range md {
kv = append(kv, k, v)
}

ctx = gmetadata.AppendToOutgoingContext(ctx, kv...)
}

ctx, cancel := context.WithDeadline(ctx, time.Now().Add(opts.RequestTimeout))
defer cancel()

err = conn.Invoke(ctx, "/"+req.Endpoint(), req.Request(), result)
resMeta := gmetadata.MD{}

err = conn.Invoke(ctx, "/"+req.Endpoint(), req.Request(), result, grpc.Header(&resMeta))
if err != nil {
gErr, ok := status.FromError(err)
if !ok {
Expand All @@ -111,6 +127,12 @@ func (t *Transport) CallNoCodec(ctx context.Context, req *client.Request[any, an
return orberrors.New(httpStatusCode, gErr.Message())
}

if opts.Headers != nil {
for k, v := range resMeta {
opts.Headers[k] = v[0]
}
}

err = conn.Close()
if err != nil {
gErr, ok := status.FromError(err)
Expand Down
6 changes: 3 additions & 3 deletions client/tests/cmd/tests_server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b249295

Please sign in to comment.