Skip to content

Commit

Permalink
client: remove the write operation on *registry.Node in LoadNodeConfi…
Browse files Browse the repository at this point in the history
…g to avoid data race during selecting Node (#138)

cherry-pick from internal code within the company
  • Loading branch information
liuzengh authored Nov 13, 2023
1 parent cc272f5 commit c2bdf24
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func selectorFilter(ctx context.Context, req interface{}, rsp interface{}, next
if err != nil {
return OptionsFromContext(ctx).fixTimeout(err)
}
ensureMsgRemoteAddr(msg, node.Network, node.Address)
ensureMsgRemoteAddr(msg, findFirstNonEmpty(node.Network, opts.Network), node.Address)

// Start to process the next filter and report.
begin := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestClientFail(t *testing.T) {
client.WithSelectorNode(node), client.WithProtocol("fake")))
require.Equal(t, node.Address, "127.0.0.1:8080")
require.Equal(t, node.ServiceName, "127.0.0.1:8080")
require.Equal(t, node.Network, "tcp")
require.Empty(t, node.Network)

// test encode failure
reqBody = &codec.Body{Data: []byte("failbody")}
Expand Down
2 changes: 0 additions & 2 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,6 @@ func (opts *Options) LoadNodeConfig(node *registry.Node) {
if node.Network != "" {
opts.Network = node.Network
opts.CallOptions = append(opts.CallOptions, transport.WithDialNetwork(node.Network))
} else {
node.Network = opts.Network
}
if node.Protocol != "" {
WithProtocol(node.Protocol)(opts)
Expand Down
11 changes: 10 additions & 1 deletion client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *stream) Init(ctx context.Context, opt ...Option) (*Options, error) {
report.SelectNodeFail.Incr()
return nil, err
}
ensureMsgRemoteAddr(msg, node.Network, node.Address)
ensureMsgRemoteAddr(msg, findFirstNonEmpty(node.Network, opts.Network), node.Address)
const invalidCost = -1
opts.Node.set(node, node.Address, invalidCost)
if opts.Codec == nil {
Expand All @@ -166,6 +166,15 @@ func (s *stream) Init(ctx context.Context, opt ...Option) (*Options, error) {
return s.opts, nil
}

func findFirstNonEmpty(ss ...string) string {
for _, s := range ss {
if s != "" {
return s
}
}
return ""
}

// Invoke implements Stream.
func (s *stream) Invoke(ctx context.Context) error {
return s.opts.StreamTransport.Init(ctx, s.opts.CallOptions...)
Expand Down

0 comments on commit c2bdf24

Please sign in to comment.