Skip to content

Commit

Permalink
Merge pull request #44 from w-h-a/order-created
Browse files Browse the repository at this point in the history
refactor: modify stream subscriber
  • Loading branch information
w-h-a committed Jul 13, 2024
2 parents b5d5ccf + a31a5f8 commit 6f615e4
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 82 deletions.
18 changes: 16 additions & 2 deletions client/mockclient/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type mockClient struct {
options client.ClientOptions
responses map[string]Response
streams map[string]client.Stream
client client.Client
mtx sync.RWMutex
}
Expand Down Expand Up @@ -49,8 +50,15 @@ func (c *mockClient) Call(ctx context.Context, req client.Request, rsp interface
}

func (c *mockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// TODO
return nil, nil
c.mtx.RLock()
defer c.mtx.RUnlock()

mock, ok := c.streams[req.Service()+":"+req.Method()]
if !ok {
return nil, errorutils.NotFound("mock.client", "service:method %s:%s not found in streams %+v", req.Service(), req.Method(), c.streams)
}

return mock, nil
}

func (c *mockClient) String() string {
Expand All @@ -65,6 +73,11 @@ func NewClient(opts ...client.ClientOption) client.Client {
responses = map[string]Response{}
}

streams, ok := GetStreamsFromContext(options.Context)
if !ok {
streams = map[string]client.Stream{}
}

c, ok := GetClientFromContext(options.Context)
if !ok {
c = grpcclient.NewClient()
Expand All @@ -73,6 +86,7 @@ func NewClient(opts ...client.ClientOption) client.Client {
m := &mockClient{
options: options,
responses: responses,
streams: streams,
client: c,
mtx: sync.RWMutex{},
}
Expand Down
77 changes: 77 additions & 0 deletions client/mockclient/mock_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package mockclient

import (
"context"
"reflect"
"sync"

"github.com/w-h-a/pkg/client"
)

type mockStream struct {
send Response
recv Response
err error
mtx sync.RWMutex
}

func (s *mockStream) Context() context.Context {
return nil
}

func (s *mockStream) Request() client.Request {
return nil
}

func (s *mockStream) Send(_ interface{}) error {
s.mtx.RLock()
defer s.mtx.RUnlock()

mock := s.send

if mock.Err != nil {
s.setError(mock.Err)
return mock.Err
}

return nil
}

func (s *mockStream) Recv(msg interface{}) error {
s.mtx.RLock()
defer s.mtx.RUnlock()

mock := s.recv

if mock.Err != nil {
s.setError(mock.Err)
return mock.Err
}

val := reflect.ValueOf(msg)
val = reflect.Indirect(val)

response := mock.Response

val.Set(reflect.ValueOf(response))

return nil
}

func (s *mockStream) Error() error {
s.mtx.RLock()
defer s.mtx.RUnlock()

return s.err
}

func (s *mockStream) Close() error {
return nil
}

func (s *mockStream) setError(e error) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.err = e
}
24 changes: 24 additions & 0 deletions client/mockclient/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package mockclient

import (
"context"
"sync"

"github.com/w-h-a/pkg/client"
)

type responsesKey struct{}
type streamsKey struct{}
type clientKey struct{}

func ClientWithResponses(service, method string, response Response) client.ClientOption {
Expand All @@ -27,6 +29,28 @@ func GetResponsesFromContext(ctx context.Context) (map[string]Response, bool) {
return rsp, ok
}

func ClientWithStreams(service, method string, send, recv Response) client.ClientOption {
return func(o *client.ClientOptions) {
streams, ok := GetStreamsFromContext(o.Context)
if !ok {
streams = map[string]client.Stream{}
}

streams[service+":"+method] = &mockStream{
send: send,
recv: recv,
mtx: sync.RWMutex{},
}

o.Context = context.WithValue(o.Context, streamsKey{}, streams)
}
}

func GetStreamsFromContext(ctx context.Context) (map[string]client.Stream, bool) {
str, ok := ctx.Value(streamsKey{}).(map[string]client.Stream)
return str, ok
}

func ClientWithClient(c client.Client) client.ClientOption {
return func(o *client.ClientOptions) {
o.Context = context.WithValue(o.Context, clientKey{}, c)
Expand Down
91 changes: 41 additions & 50 deletions proto/streams/streams.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions proto/streams/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ message SubscribeRequest {
string id = 1;
string group = 2;
string topic = 3;
bool auto_ack = 4;
int64 ack_wait = 5;
int64 retry_limit = 6;
int64 offset = 7;
int64 ack_wait = 4;
int64 retry_limit = 5;
int64 offset = 6;
}

message SubscribeResponse {}
Expand All @@ -34,12 +33,12 @@ message UnsubscribeRequest {

message UnsubscribeResponse {}

// consume request (stream Event response)
// consume request
message ConsumeRequest {
string id = 1;
}

// ack request (no response)
// ack request
message AckRequest {
string id = 1;
bool success = 2;
Expand Down
46 changes: 28 additions & 18 deletions proto/ticket/ticket.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6f615e4

Please sign in to comment.