-
Notifications
You must be signed in to change notification settings - Fork 9.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpcproxy: support lease coalescing #7221
grpcproxy: support lease coalescing #7221
Conversation
I will add tests later if the code structure seems reasonable. |
I noticed this has broadcasts. Usually leases are only going to be updated by a single client so I'm not sure it makes sense to structure it this way. Also, does that broadcasting mean the proxy will send a keepalive response even if a client doesn't issue a keep alive request? There's no need for special tests for this... integration tests with |
proxy/grpcproxy/lease.go
Outdated
@@ -23,12 +26,21 @@ import ( | |||
|
|||
type leaseProxy struct { | |||
client *clientv3.Client | |||
bcasts *leaseBroadcasts | |||
} | |||
|
|||
func NewLeaseProxy(c *clientv3.Client) pb.LeaseServer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update integration/cluster_proxy.go
so it uses this?
Codecov Report
@@ Coverage Diff @@
## master #7221 +/- ##
=========================================
Coverage ? 63.88%
=========================================
Files ? 233
Lines ? 20981
Branches ? 0
=========================================
Hits ? 13403
Misses ? 6579
Partials ? 999
Continue to review full report at Codecov.
|
@heyitsanthony, i structured the code that mirror to that of watch. I am unsure if this is the best approach. This is what i am thinking, I want to group all the lease proxy streams that's issues keepalive req to the same lease Id together. I call that group a broadcast. with in the broadcast structure, I use lease client to send the keepAlive req to the etcd backend for that leaseID. Once the lease client receives a keepalive response back, then it broadcasst the response to all the lease proxy streams in the broadcast group. then it is the responsibility of lease proxy stream to determine whether to send the keep alive response back to the client. broadcasts maintains mapping of lease proxy stream and broadcast. also i noticed that the msg transmission logic inside leaseProxyStream is not correct. let me fix that first. |
d2e4b9c
to
e3cdc70
Compare
proxy/grpcproxy/adapter_util.go
Outdated
@@ -0,0 +1,140 @@ | |||
// Copyright 2016 The etcd Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use descriptive file names. chan_stream.go
?
needs a rebase against master |
@heyitsanthony yeah, will resolve the merge conflict and also will remove the print statements when the code looks good. |
proxy/grpcproxy/lease.go
Outdated
func (lps *leaseProxyStream) stop() { | ||
fmt.Printf("LeaseKeepAlive() stream %v closing... \n", lps.stream) | ||
lps.cancel() | ||
lps.wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if leaderc is closed, what ensures the recv loop is stopped? The stream ctx won't be canceled by lps.cancel()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LeaseKeepAlive() exits should cause stream to cancel right? therefore the recv() should be cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but LeaseKeepAlive()
calls defer lps.stop()
. There's a deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can stream.send()
blocks forever? from https://github.com/fanminshi/etcd/blob/61d99449d5e6c7ae523c90379d66f0e83dd94531/proxy/grpcproxy/lease.go#L254
if that blocks forever, then it means that lps.stop() will be blocked. that's where i see the deadlock.
proxy/grpcproxy/lease.go
Outdated
fmt.Printf("leaseProxyStream.listeningFromServer() %v ...\n", leaseID) | ||
var ( | ||
msgsCh <-chan struct{} | ||
// leaseTTL is a timeout period for this function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't write essay comments when what's happening isn't very complicated or intricate; it's misleading and makes the code more difficult to read
The code doesn't need to make this assumption anyway, the first select can block indefinitely on a nil ticker channel:
var ticker <-chan time.Time
select {
case <-ticker:
...
case ka := <-keepaliveCh:
....
ticker = time.After(ka.TTL)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. will change.
4b83a4e
to
61d9944
Compare
61d9944
to
9f824a6
Compare
All fixed. PTAL cc\ @heyitsanthony. |
proxy/grpcproxy/lease.go
Outdated
} | ||
lp.mu.Unlock() | ||
|
||
cctx, ccancel := context.WithCancel(stream.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just ctx, cancel
integration/v3_lease_test.go
Outdated
@@ -438,6 +440,8 @@ func TestV3LeaseFailover(t *testing.T) { | |||
t.Fatal(lresp.Error) | |||
} | |||
|
|||
expectedExp = time.Now().Add(time.Duration(lresp.TTL) * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move this? it works fine for the regular etcd server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lease proxy uses watch to detect leader loss where as the etcd server has some different logic for that. from my testing, lease proxy detects the leader loss right away. so at
func TestV3LeaseFailover(t *testing.T) {
...
for {
// first loop
if err = lac.Send(lreq); err != nil {
break
}
lkresp, rxerr := lac.Recv()
if rxerr != nil {
// lease proxy returns leader loss at here.
break
}
// lease proxy code never reaches here
expectedExp = time.Now().Add(time.Duration(lkresp.TTL) * time.Second)
time.Sleep(time.Duration(lkresp.TTL/2) * time.Second)
}
...
// if expectedExp not set, waits forever
time.Sleep(expectedExp.Sub(time.Now()) - 500*time.Millisecond)
}
the original code will not have expectedExp
set , so the time.Sleep(expectedExp.Sub(time.Now()) - 500*time.Millisecond)
sleeps forever.
in order to solve that issue, we need to set expectedExp
at very beginning.
why did the etcd server code works?
from my investigation,
func TestV3LeaseFailover(t *testing.T) {
...
for {
// loop one okay
// loop two
if err = lac.Send(lreq); err != nil {
break
}
lkresp, rxerr := lac.Recv()
// loop two encounters rxerr(request timeout)
if rxerr != nil {
break
}
// loop one sets expectedExp
expectedExp = time.Now().Add(time.Duration(lkresp.TTL) * time.Second)
time.Sleep(time.Duration(lkresp.TTL/2) * time.Second)
}
...
// expectedExp is set at loop one, therefore the sleeps works.
time.Sleep(expectedExp.Sub(time.Now()) - 500*time.Millisecond)
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lease proxy uses watch to detect leader loss where as the etcd server has some different logic for that.
They both go through the same interceptor path on the etcd server; the only difference is there is a separate stream monitoring for leader loss on the proxy. Why is the etcd server processing a keepalive request/response before erroring out on leader loss, but not the proxy? There should still be a delay between the leader loss and the etcd server recognizing that loss...
This needs some kind of investigation instead of trying to mask it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem with the proxy is that using keepAlive()
for lease renewal doesn't return any error to the proxy such as the request timeout
i think. In that case, my current code doesn't return any proxy_etcd client error. so it is then the job of the proxy watch to catch the leader loss event. should we also return any errors that's due to proxy_etcd client? I am not sure how to achieve that by just using keepAlive()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i investigate a bit more. the reason why proxy doesn't get the first req through is that there is a delay in when lease client sends out ka req to etcd server. https://github.com/coreos/etcd/blob/master/clientv3/lease.go#L456
this delay is long enough for watch detect leader loss before the req being sending out. if i change the delay to 10ms, the behavior is the same as etcd server.
proxy/grpcproxy/lease.go
Outdated
cancel() | ||
return err | ||
func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) { | ||
fmt.Printf("LeaseKeepAlive() new stream %v \n", stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -0,0 +1,104 @@ | |||
// Copyright 2016 The etcd Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2017
lserver := &ls2lcServerStream{chanServerStream{headerc, trailerc, srv, nil}} | ||
go func() { | ||
if err := c.leaseServer.LeaseKeepAlive(lserver); err != nil { | ||
fmt.Printf("ls2lc.LeaseKeepAlive() Recv error (%v) \n", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
proxy/grpcproxy/lease.go
Outdated
return | ||
case <-ticker: | ||
fmt.Printf("leaseProxyStream.listeningFromServer() %v timeout \n", leaseID) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't there be a return
when the ticker expires?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
proxy/grpcproxy/lease.go
Outdated
fmt.Printf("leaseProxyStream.serveKeepAliveRequest() req %v done \n", rr) | ||
} | ||
|
||
func (lps *leaseProxyStream) listeningFromServer(leaseID int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keepAliveLoop
?
proxy/grpcproxy/lease.go
Outdated
msgsCh <-chan struct{} | ||
ticker <-chan time.Time | ||
) | ||
cctx, ccancel := context.WithCancel(lps.ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why have this? ccancel() is only called when the the function exits so nothing is blocking on cctx
; it's a no-op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by blocking on cctx? the purpose of the new context is to cancel the KeepAlive()
for the given leaseID if keepAliveLoop()
exit. If we don't use a new ctx, how do we tell the proxy_etcd client to stop sending keepalive req for this lease id to etcd backend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, OK. You're right. Nevermind.
proxy/grpcproxy/lease.go
Outdated
// clean up | ||
ccancel() | ||
lps.mu.Lock() | ||
delete(lps.keepAliveLeases, leaseID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is racey:
- client has ka stream on this proxy, issues ka reqs
- client loses connection
- client connects to another proxy, issues ka reqs
- client loses connection to that proxy
- opens new ka stream on this proxy, sends ka req; proxy receives req, reuses existing channel, posts message
- ticker times out before receiving posted message, tears down goroutine, keepalive request is lost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at step 2, should the loss of connection causes the stream to be closed and therefore closes the leaseProxyStream?
then at step 5, reusing the existing channel is not possible because the channel is closed at step 1 when leaseProxyStream is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, it's fine for the disconnect case. Suppose there's no disconnect but the next request is delayed so that the ticker times out and the defer begins. The ka request is received on the stream and the lease proxy finds an entry in lps.keepAliveLeases, passing the request to the corresponding goroutine. However, the defer continues and tears down the goroutine. What happens to that request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see. got it. the req doesn't go though because keepAliveLoop()
for that lease id has teared down. let me rethink how to handle case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose that I get rid of the ticker. the keepAliveLoop() only exits if context is cancelled or lps.client.KeepAlive()
closes the response channel. In this case, a new ka request finds an entry in lps.keepAliveLeases and passes through right before the keepAliveLoop()
defer function deletes the entry in lps.keepAliveLeases. in this case, what should we deal with the new request that's not being processed? should the keepAliveLoop() clean up logic check if there are outstanding keepalive requests and send some kinda error back to stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ticker / some timeout mechanism is still necessary, otherwise the proxy will keep sending keepalives until the stream is torn down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, you are right.
proxy/grpcproxy/lease.go
Outdated
ticker = time.After(time.Duration(r.TTL) * time.Second) | ||
select { | ||
case lps.lrpCh <- r: | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to drop this message on the floor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a best effort approach. I am not entirely sure if that's the best solution. it is entirely possible that a critical message be dropped, suppose that we have 1000 leaseKeepAlive req for lease A and only one lease leaseKeepAlive req for lease B, we get all 1000 leaseKeepAlive resps back for A, and they fill up the lps.respc
. Then we also get the only leaseKeepAlive resp for B, and it will be dropped. so it will be unsafe in that scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not how the lease grpc stream protocol works. See https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/lease.go#L63 -- where does it drop requests on the floor?
It's not best-effort; it will not drop requests without sending a response. If it doesn't eventually respond to every request, that should mean the stream is lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk, should I just not drop msg then? just let go routine wait if the channel is full? and close the channel if the stream proxy is closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably just block for now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it.
216f711
to
d1a1ccf
Compare
6cd6238
to
c237c0d
Compare
All fixed. PTAL c/ @heyitsanthony |
clientv3/lease.go
Outdated
return newFromLessor(l) | ||
} | ||
|
||
func NewLeaseFromLeaseClient(remote pb.LeaseClient, cfg Config) Lease { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's creating a Lease object from a raw grpc lease client, it shouldn't know about clientv3 config; defeats the purpose of having raw grpc if it needs a higher-level clientv3 config
clientv3/lease.go
Outdated
|
||
l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) | ||
return l | ||
func newFromLessor(lessor *lessor) Lease { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new /what/? This function makes the cut at the wrong place so it's impossible to have a sensible name. Try:
func NewLease(c *Client) Lease {
return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout + time.Second)
}
func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
l := &lessor{
donec: make(chan struct{}),
keepAlives: make(map[LeaseID]*keepAlive),
remote: remote,
firstKeepAliveTimeout: keepAliveTimeout,
}
if l.firstKeepAliveTimeout == time.Second {
l.firstKeepAliveTimeout = defaultTTL
}
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
return l
}
(this is similar to how watch does it)
integration/cluster_proxy.go
Outdated
@@ -78,11 +81,13 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { | |||
} | |||
rpc := toGRPC(c) | |||
c.KV = clientv3.NewKVFromKVClient(rpc.KV) | |||
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this probably needs to be under the pmu lock to avoid a goroutine leak if cluster termination races with creating a new client
proxy/grpcproxy/lease.go
Outdated
@@ -1,4 +1,4 @@ | |||
// Copyright 2016 The etcd Authors | |||
// Copyright 2017 The etcd Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't change year for files that already exist
proxy/grpcproxy/lease.go
Outdated
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" | ||
) | ||
|
||
type leaseProxy struct { | ||
client *clientv3.Client | ||
client *clientv3.Client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what uses this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LeaseGrant(), LeaseRevoke(), and LeaseTimeToLive() use client to create new conn.
e.g
func (lp *leaseProxy) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
conn := lp.client.ActiveConnection()
return pb.NewLeaseClient(conn).LeaseGrant(ctx, cr)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should use clientv3.Lease
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using clientv3.Lease fails few tests that specifically use raw grpc LeaseGrant()
with pb.LeaseGrantRequest{ID: 1, TTL: 1})
.
e.g
lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
context.TODO(),
&pb.LeaseGrantRequest{ID: 1, TTL: 1})
since the clientv3.Lease.Grant() doesn't not accept a lease ID, there is no way to satisfy a LeaseGrant()
req for a specific lease ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok keep around a LeaseClient, then-- not a full clientv3.Client
proxy/grpcproxy/lease.go
Outdated
// don't close keepAliveLoop(), let it continuing to process the KeepAlive reqs. | ||
if canSendRespC.get() > 0 { | ||
lps.mu.Unlock() | ||
ticker = make(<-chan time.Time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ticker = nil
proxy/grpcproxy/lease.go
Outdated
func (lps *leaseProxyStream) serveKeepAliveRequest(rr *pb.LeaseKeepAliveRequest) { | ||
lps.mu.Lock() | ||
defer lps.mu.Unlock() | ||
canSendRespC, ok := lps.keepAliveLeases[rr.ID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abcC
looks too much like channels. Try neededResps
proxy/grpcproxy/lease.go
Outdated
ID: int64(ttlResp.ID), | ||
TTL: ttlResp.TTL, | ||
} | ||
lps.respc <- r |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this breaks if canSendRespC.get() > 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if canSendRespC.get() > 1
, then I execute following code to send ka resp back to the client.
case rp, ok := <-respc:
if !ok {
...
if canSendRespC.get() == 0 {
return nil
}
// case: canSendRespC.get() > 1
ttlResp, err := lps.lessor.TimeToLive(cctx, clientv3.LeaseID(leaseID))
if err != nil {
return err
}
r := &pb.LeaseKeepAliveResponse{
Header: ttlResp.ResponseHeader,
ID: int64(ttlResp.ID),
TTL: ttlResp.TTL,
}
lps.respc <- r
return nil
}
...
i am unsure what does above code breaks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are n > 1 responses, it will not send n responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
proxy/grpcproxy/lease.go
Outdated
ID: int64(rp.ID), | ||
TTL: rp.TTL, | ||
} | ||
lps.respc <- r |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should send as many responses as possible within some short time window:
timer := time.After(500 * time.Millisecond)
for neededResps.get() > 0 {
done := false
select {
case lps.respc <- r:
case <-timer: done = true
}
if done { break }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why couldn't i send n msgs where n := neededResps.get()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TTL will be stale
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by sending as many responses as possible breaks the 1:1 reqs to resps ratio.
e.g suppose a client sends 2 ka reqs for same lease to proxy and proxy hasn't receive any resps, then proxy received a resp and the neededResps.get() == 2
. proxy then send 5 msgs back to client within 500 ms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, it should have neededResps.add(-1) at the end of the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see. that makes sense. ty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather, not the end of the loop, but case lps.respc <- r: neededResps.add(-1)
proxy/grpcproxy/lease.go
Outdated
|
||
// toErrCh only sends non nil error to errCh to ensure | ||
// only the non-nil error will be returned to the calling client | ||
func (lps *leaseProxyStream) toErrCh(err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the errch is not buffered. what if it receives multiple errors at the same time and blocks goroutines that sends the error. e.g multiple keepAliveLoop goroutines sends error at the same time. My original thought is that the first error wins and will be sent back to client and rest of errors will be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffer it by 2 entries (one for recv, one for send) and keep the scope restricted to LeaseKeepAlive
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it.
c237c0d
to
b57149b
Compare
PTAL cc/ @heyitsanthony. I will add the leader stuff when #7314 is merged. |
proxy/grpcproxy/lease.go
Outdated
|
||
cancel func() | ||
|
||
errc chan error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only needs to be scoped to LeaseKeepAlive
, just move the close(errc) out of lps.close(). Try to limit scope when possible, it's a corollary to the principle of least privilege.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recvLoop() and sendLoop() can return error even after the LeaseKeepAlive() returns. In that case, if I close close(errc)
in the scope of LeaseKeepAlive(), then it is possible that recvLoop() and sendLoop() send error to a closed errc
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh? Move it to the goroutine in LeaseKeepAlive
right after lps.close()
. It is the exact same behavior but without the scope creep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, got it. ty
proxy/grpcproxy/lease.go
Outdated
<-stopc | ||
<-stopc | ||
lps.close() | ||
lp.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't stop blocking until all outstanding streams in the proxy are closed out. I don't think it should be here; it's leaky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate on the blocking part? I thought the purpose of this goroutine is to make sure all resources related to this leaseProxyStream are released. Once the resources are released, it tells that the lease proxy that this stream is done. Not sure what's blocking this goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right; never mind. This is fine.
proxy/grpcproxy/lease.go
Outdated
|
||
ctx context.Context | ||
|
||
cancel func() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancel context.CancelFunc
proxy/grpcproxy/lease.go
Outdated
// mu protects keepAliveLeases | ||
mu sync.RWMutex | ||
// keepAliveLeases tracks how many outstanding keepalive requests which need responses are on a lease. | ||
// the counter is used to control a 1:1 keepalive req and resp ratio. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this extra description is necessary...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk, i'll keep first sentence then. keepAliveLeases tracks how many outstanding keepalive requests which need responses are on a lease.
proxy/grpcproxy/lease.go
Outdated
select { | ||
case <-stopc: | ||
stopc <- struct{}{} | ||
case err = <-lps.errc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What errors come out of this that are actually needed? The watch proxy doesn't do this. Is it only context cancel errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about any stream error occurred in the recvLoop. the errc catches that. the watch code ignores those errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't answer my question. What errors are being returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I am not entirely sure. I recall that the original implementation return error on stream.Send(). from here https://github.com/coreos/etcd/blob/master/proxy/grpcproxy/lease.go#L77-L86. I am trying to keep that semantic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyways, should I ignore any errors occurred on stream send() or recv()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just keep it I guess
proxy/grpcproxy/lease.go
Outdated
|
||
lessor clientv3.Lease | ||
// kawg tracks keepAliveLoop goroutines | ||
kawg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just wg
? There's only one waitgroup per lps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk, i thought kawg might be a bit more descriptive. i guess not.
proxy/grpcproxy/lease.go
Outdated
} | ||
} | ||
|
||
func (lps *leaseProxyStream) toRespc(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replyToClient
? toRespc
is not descriptive at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change.
proxy/grpcproxy/lease.go
Outdated
ID: int64(ttlResp.ID), | ||
TTL: ttlResp.TTL, | ||
} | ||
lps.toRespc(r, neededResps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to reply to every request, not only the ones that can be sent in the TTL window; the goroutine can't terminate if it can't reply to all of them within the TTL window. If the lease is expired, it can send them all since the TTL can't be "stale". If the TimeToLive request says it's not expired, this should probably panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also i think we should take care of the case that LeaseKeepAlive() ctx is cancelled while sending resp msg back. In that case, replyToClient()
should just return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it will need a select
to avoid blocking on cancelation
b57149b
to
8df2d09
Compare
All fixed. PTAL @heyitsanthony |
proxy/grpcproxy/lease.go
Outdated
respc: make(chan *pb.LeaseKeepAliveResponse), | ||
ctx: ctx, | ||
cancel: cancel, | ||
errc: make(chan error, 2), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seriously, only scope this to LeaseKeepAlive
, don't put it in leaseProxyStream
. Right below this lps :=
statement, have errc := make(chan error, 2)
.
proxy/grpcproxy/lease.go
Outdated
|
||
cancel context.CancelFunc | ||
|
||
errc chan error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
proxy/grpcproxy/lease.go
Outdated
respc chan *pb.LeaseKeepAliveResponse | ||
|
||
ctx context.Context | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove line empty; ctx/cancel are related
proxy/grpcproxy/lease.go
Outdated
|
||
func (lps *leaseProxyStream) replyToClient(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter, timer <-chan time.Time) { | ||
for neededResps.get() > 0 { | ||
done := false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't necessary if it's a separate function, just do return
wherever there is a done = true
0eff4d0
to
a38020a
Compare
All fixed. PTAL @heyitsanthony |
a38020a
to
37d9b4d
Compare
fixed |
37d9b4d
to
65b59f4
Compare
Incorporated kick leader into lease proxy. PTAL @heyitsanthony |
lgtm |
proxy ci fails on
I don't think it is related to this pr. |
Failure is unrelated |
FIX #6912