Skip to content

Commit

Permalink
fix: pool-coordinator ask client to re-list-watch when proxy target c…
Browse files Browse the repository at this point in the history
…hange(cloud to pool-coor or pool-coor to cloud) (#1195)

Co-authored-by: lizhixin.lzx <lizhixin.lzx@alibaba-inc.com>
  • Loading branch information
LaurenceLiZhixin and lizhixin.lzx authored Feb 10, 2023
1 parent de106a8 commit 08e1619
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/yurthub/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (pp *PoolCoordinatorProxy) poolWatch(rw http.ResponseWriter, req *http.Requ
case <-t.C:
if !pp.isCoordinatorReady() {
klog.Infof("notified the pool coordinator is not ready for handling request, cancel watch %s", hubutil.ReqString(req))
util.ReListWatchReq(rw, req)
poolServeCancel()
return
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/yurthub/proxy/remote/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ func NewLoadBalancer(
algo = &rrLoadBalancerAlgo{backends: backends, checker: healthChecker}
}

return &loadBalancer{
backends: backends,
algo: algo,
}, nil
lb.backends = backends
lb.algo = algo

return lb, nil
}

func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -211,19 +211,18 @@ func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
for {
select {
case <-t.C:
if lb.coordinatorGetter == nil {
continue
}
coordinator := lb.coordinatorGetter()
if coordinator == nil {
continue
}
if _, isReady := coordinator.IsReady(); isReady {
klog.Infof("notified the pool coordinator is ready, cancel the req %s making it handled by pool coordinator", hubutil.ReqString(req))
util.ReListWatchReq(rw, req)
cloudServeCancel()
return
}
case <-clientReqCtx.Done():
klog.Infof("watch req %s is canceled by client, when pool coordinator is not ready", hubutil.ReqString(req))
return
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package util
import (
"context"
"fmt"
"mime"
"net/http"
"strings"
"time"
Expand All @@ -32,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
Expand Down Expand Up @@ -490,3 +493,36 @@ func IsEventCreateRequest(req *http.Request) bool {
info.Resource == "events" &&
info.Verb == "create"
}

func ReListWatchReq(rw http.ResponseWriter, req *http.Request) {
agent, _ := util.ClientComponentFrom(req.Context())
klog.Infof("component %s request urL %s with rv = %s is rejected, expect re-list",
agent, util.ReqString(req), req.URL.Query().Get("resourceVersion"))

serializerManager := serializer.NewSerializerManager()
mediaType, params, _ := mime.ParseMediaType(runtime.ContentTypeProtobuf)

_, streamingSerializer, framer, err := serializerManager.WatchEventClientNegotiator.StreamDecoder(mediaType, params)
if err != nil {
klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error())
return
}

streamingEncoder := streaming.NewEncoder(framer.NewFrameWriter(rw), streamingSerializer)
if err != nil {
klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error())
return
}

outEvent := &metav1.WatchEvent{
Type: string(watch.Error),
}

if err := streamingEncoder.Encode(outEvent); err != nil {
klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error())
return
}

klog.Infof("this request write error event back finished.")
rw.(http.Flusher).Flush()
}

0 comments on commit 08e1619

Please sign in to comment.