Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: get event streaming working #29

Merged
merged 19 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,18 @@ tidy:
.PHONY: style
style:
goimports -l -w ./api
goimports -l -w ./broker
goimports -l -w ./client
goimports -l -w ./runtime
goimports -l -w ./security
goimports -l -w ./server
goimports -l -w ./store
goimports -l -w ./streams
goimports -l -w ./telemetry
goimports -l -w ./utils

.PHONY: clean
clean:
go clean -testcache

.PHONY: test
test:
go test -v -race ./...
go clean -testcache && go test -v -race ./...

.PHONY: proto-account
proto-account:
Expand All @@ -38,6 +34,10 @@ proto-rule:
proto-runtime:
protoc proto/runtime/*.proto --go_out=paths=source_relative:. --proto_path=.

.PHONY: proto-streams
proto-streams:
protoc proto/streams/*.proto --go_out=paths=source_relative:. --proto_path=.

.PHONY: proto-ticket
proto-ticket:
protoc proto/ticket/*.proto --go_out=paths=source_relative:. --proto_path=.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
| Package | Examples | Use Case |
| --------- | ---------------- | --------------------------------- |
| api | http | build gateway servers |
| broker | nats | asynchronous communication |
| client | grpc | synchronous communication |
| runtime | kubernetes | service info |
| security | jwts, TBD | token provisioning and encryption |
| server | grpc | build backend servers |
| store | cockroach, redis | data persistence |
| streams | redis | asynchronous communication |
| telemetry | memory | logs, metrics, and traces |
4 changes: 2 additions & 2 deletions api/httpapi/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
type httpApi struct {
options api.ApiOptions
mux *http.ServeMux
mtx sync.RWMutex
exit chan chan error
mtx sync.RWMutex
}

func (a *httpApi) Options() api.ApiOptions {
Expand Down Expand Up @@ -88,8 +88,8 @@ func NewApi(opts ...api.ApiOption) api.Api {
a := &httpApi{
options: options,
mux: http.NewServeMux(),
mtx: sync.RWMutex{},
exit: make(chan chan error),
mtx: sync.RWMutex{},
}

return a
Expand Down
1 change: 1 addition & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ type Client interface {
Options() ClientOptions
NewRequest(opts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
String() string
}
178 changes: 167 additions & 11 deletions client/grpcclient/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpcclient
import (
"context"
"fmt"
"sync"
"time"

"github.com/w-h-a/pkg/client"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (c *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
}

actualCall := c.call

for i := len(callOptions.CallWrappers); i > 0; i-- {
actualCall = callOptions.CallWrappers[i-1](actualCall)
}
Expand All @@ -75,14 +77,14 @@ func (c *grpcClient) Call(ctx context.Context, req client.Request, rsp interface

namespace := req.Namespace()

server := req.Server()
name := req.Service()

service, err := next()
if err != nil {
if err == client.ErrServiceNotFound {
return errorutils.InternalServerError("client", "failed to find %s.%s: %v", server, namespace, err)
return errorutils.InternalServerError("client", "failed to find %s.%s: %v", name, namespace, err)
}
return errorutils.InternalServerError("client", "failed to select %s.%s: %v", server, namespace, err)
return errorutils.InternalServerError("client", "failed to select %s.%s: %v", name, namespace, err)
}

// TODO: refactor this cruft
Expand All @@ -104,7 +106,7 @@ func (c *grpcClient) Call(ctx context.Context, req client.Request, rsp interface

var e error

// retry lopp
// retry loop
for i := 0; i <= callOptions.RetryCount; i++ {
go func(i int) {
ch <- call(i)
Expand Down Expand Up @@ -134,34 +136,125 @@ func (c *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
return e
}

func (c *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
callOptions := client.NewCallOptions(&c.options.CallOptions, opts...)

next, err := c.next(req, callOptions)
if err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, errorutils.Timeout("client", fmt.Sprintf("%v", ctx.Err()))
default:
}

call := func(i int) (client.Stream, error) {
duration, err := callOptions.Backoff(ctx, req, i)
if err != nil {
return nil, errorutils.InternalServerError("client", err.Error())
}

if duration.Seconds() > 0 {
time.Sleep(duration)
}

namespace := req.Namespace()

name := req.Service()

service, err := next()
if err != nil {
if err == client.ErrServiceNotFound {
return nil, errorutils.InternalServerError("client", "failed to find %s.%s: %v", name, namespace, err)
}
return nil, errorutils.InternalServerError("client", "failed to select %s.%s: %v", name, namespace, err)
}

// TODO: refactor this cruft
address := service.Name + "." + service.Namespace + ":" + fmt.Sprintf("%d", service.Port)

if len(service.Address) > 0 {
address = service.Address
}

stream, err := c.stream(ctx, address, req, callOptions)
if e, ok := err.(*errorutils.Error); ok {
return stream, e
}

return stream, err
}

type response struct {
stream client.Stream
err error
}

ch := make(chan response, callOptions.RetryCount+1)

var e error

// retry loop
for i := 0; i <= callOptions.RetryCount; i++ {
go func(i int) {
s, err := call(i)
ch <- response{s, err}
}(i)

select {
case <-ctx.Done():
return nil, errorutils.Timeout("client", fmt.Sprintf("%v", ctx.Err()))
case rsp := <-ch:
if rsp.err == nil {
return rsp.stream, rsp.err
}

shouldRetry, retryErr := callOptions.RetryCheck(ctx, req, i, rsp.err)
if retryErr != nil {
return nil, retryErr
}

if !shouldRetry {
return nil, rsp.err
}

e = rsp.err
}
}

return nil, e
}

func (c *grpcClient) String() string {
return "grpc"
}

func (c *grpcClient) next(request client.Request, options client.CallOptions) (func() (*runtime.Service, error), error) {
namespace := request.Namespace()
server := request.Server()
name := request.Service()
port := request.Port()

// if we have the address already, use that
if len(options.Address) > 0 {
return func() (*runtime.Service, error) {
return &runtime.Service{
Namespace: namespace,
Name: server,
Name: name,
Port: port,
Address: options.Address,
}, nil
}, nil
}

// otherwise get the details from the selector
next, err := c.options.Selector.Select(namespace, server, port, options.SelectOpts...)
next, err := c.options.Selector.Select(namespace, name, port, options.SelectOpts...)
if err != nil {
if err == client.ErrServiceNotFound {
return nil, errorutils.InternalServerError("client", "failed to find %s.%s: %v", server, namespace, err)
return nil, errorutils.InternalServerError("client", "failed to find %s.%s: %v", name, namespace, err)
}
return nil, errorutils.InternalServerError("client", "failed to select %s.%s: %v", server, namespace, err)
return nil, errorutils.InternalServerError("client", "failed to select %s.%s: %v", name, namespace, err)
}

return next, nil
Expand Down Expand Up @@ -196,7 +289,7 @@ func (c *grpcClient) call(ctx context.Context, address string, req client.Reques
c.withCreds(address),
}

clientConn, err := grpc.NewClient(address, grpcDialOptions...)
conn, err := grpc.NewClient(address, grpcDialOptions...)
if err != nil {
return errorutils.InternalServerError("client", fmt.Sprintf("failed to get client connection: %v", err))
}
Expand All @@ -211,7 +304,7 @@ func (c *grpcClient) call(ctx context.Context, address string, req client.Reques
grpc.CallContentSubtype(marshaler.Name()),
}

err := clientConn.Invoke(
err := conn.Invoke(
ctx,
ToGRPCMethod(req.Method()),
req.Unmarshaled(),
Expand All @@ -232,6 +325,69 @@ func (c *grpcClient) call(ctx context.Context, address string, req client.Reques
return e
}

func (c *grpcClient) stream(ctx context.Context, address string, req client.Request, _ client.CallOptions) (client.Stream, error) {
header := map[string]string{}

md, ok := metadatautils.FromContext(ctx)
if ok {
for k, v := range md {
header[k] = v
}
}

header["content-type"] = req.ContentType()

grpcMetadata := metadata.New(header)

ctx = metadata.NewOutgoingContext(ctx, grpcMetadata)

marshaler, err := c.newMarshaler(req.ContentType())
if err != nil {
return nil, errorutils.InternalServerError("client", err.Error())
}

grpcDialOptions := []grpc.DialOption{
c.withCreds(address),
}

conn, err := grpc.NewClient(address, grpcDialOptions...)
if err != nil {
return nil, errorutils.InternalServerError("client", fmt.Sprintf("failed to get client connection: %v", err))
}

grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(marshaler),
grpc.CallContentSubtype(marshaler.Name()),
}

newCtx, cancel := context.WithCancel(ctx)

s, err := conn.NewStream(
newCtx,
&grpc.StreamDesc{
StreamName: req.Method(),
ClientStreams: true,
ServerStreams: true,
},
ToGRPCMethod(req.Method()),
grpcCallOptions...,
)
if err != nil {
cancel()
conn.Close()
return nil, errorutils.InternalServerError("client", fmt.Sprintf("failed to create stream: %v", err))
}

return &grpcStream{
context: ctx,
cancel: cancel,
request: req,
connection: conn,
stream: s,
mtx: sync.RWMutex{},
}, nil
}

func (c *grpcClient) newMarshaler(contentType string) (encoding.Codec, error) {
marshaler, ok := marshalutils.DefaultMarshalers[contentType]
if !ok {
Expand Down
6 changes: 5 additions & 1 deletion client/grpcclient/grpc_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (r *grpcRequest) Namespace() string {
return r.options.Namespace
}

func (r *grpcRequest) Server() string {
func (r *grpcRequest) Service() string {
return r.options.Name
}

Expand All @@ -34,6 +34,10 @@ func (r *grpcRequest) Unmarshaled() interface{} {
return r.options.UnmarshaledRequest
}

func (r *grpcRequest) Stream() bool {
return r.options.Stream
}

func (r *grpcRequest) String() string {
return "grpc"
}
Expand Down
Loading