Skip to content

Commit

Permalink
resource_manager/client: introduce watch resource group (#6510)
Browse files Browse the repository at this point in the history
ref #6509

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CabinfeverB and ti-chi-bot[bot] authored May 31, 2023
1 parent ad05acb commit 9d0249c
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 71 deletions.
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
Expand Down
31 changes: 22 additions & 9 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// MetaStorageClient is the interface for meta storage client.
Expand Down Expand Up @@ -125,7 +125,12 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
PrevKv: options.prevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Put(ctx, req)
cli := c.metaStorageClient()
if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Put(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationPut, start, err, resp.GetHeader()); err != nil {
Expand Down Expand Up @@ -158,7 +163,12 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
Revision: options.revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Get(ctx, req)
cli := c.metaStorageClient()
if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Get(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationGet, start, err, resp.GetHeader()); err != nil {
Expand All @@ -177,7 +187,11 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan
options.rangeEnd = getPrefix(key)
}

res, err := c.metaStorageClient().Watch(ctx, &meta_storagepb.WatchRequest{
cli := c.metaStorageClient()
if cli == nil {
return nil, errs.ErrClientGetMetaStorageClient
}
res, err := cli.Watch(ctx, &meta_storagepb.WatchRequest{
Key: key,
RangeEnd: options.rangeEnd,
StartRevision: options.revision,
Expand All @@ -190,13 +204,12 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan
go func() {
defer func() {
close(eventCh)
if r := recover(); r != nil {
log.Error("[pd] panic in client `Watch`", zap.Any("error", r))
return
}
}()
for {
resp, err := res.Recv()
failpoint.Inject("watchStreamError", func() {
err = errors.Errorf("fake error")
})
if err != nil {
return
}
Expand Down
Loading

0 comments on commit 9d0249c

Please sign in to comment.