Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Returns all connection information in the registration rpc #2199

Merged
merged 3 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions pkg/gateway/mirror/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,11 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
}
return nil, err
}
return req, nil
}

func (s *server) Advertise(ctx context.Context, req *payload.Mirror_Targets) (res *payload.Mirror_Targets, err error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.AdvertiseRPCName), apiName+"/"+vald.AdvertiseRPCName)
defer func() {
if span != nil {
span.End()
}
}()
// Get own address and the addresses of other mirror gateways to which this gateway is currently connected.
tgts, err := s.mirror.MirrorTargets()
if err != nil {
err = status.WrapWithInternal(vald.AdvertiseRPCName+" API failed to get connected vald gateway targets", err,
err = status.WrapWithInternal(vald.RegisterRPCName+" API failed to get connected vald gateway targets", err,
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Expand All @@ -141,7 +133,7 @@ func (s *server) Advertise(ctx context.Context, req *payload.Mirror_Targets) (re
},
},
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.AdvertiseRPCName,
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
},
)
Expand Down
170 changes: 35 additions & 135 deletions pkg/gateway/mirror/service/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewMirror(opts ...MirrorOption) (_ Mirror, err error) {
func (m *mirr) Start(ctx context.Context) (<-chan error, error) {
ech := make(chan error, 100)

aech, err := m.startAdvertise(ctx)
rech, err := m.startRegister(ctx)
if err != nil {
close(ech)
return nil, err
Expand All @@ -102,7 +102,7 @@ func (m *mirr) Start(ctx context.Context) (<-chan error, error) {
select {
case <-ctx.Done():
return ctx.Err()
case err = <-aech:
case err = <-rech:
}
if err != nil {
select {
Expand All @@ -116,95 +116,55 @@ func (m *mirr) Start(ctx context.Context) (<-chan error, error) {
return ech, nil
}

func (m *mirr) startAdvertise(ctx context.Context) (<-chan error, error) {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.startAdvertise")
func (m *mirr) startRegister(ctx context.Context) (<-chan error, error) {
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.startRegister")
defer func() {
if span != nil {
span.End()
}
}()
ech := make(chan error, 100)

err := m.registers(ctx, &payload.Mirror_Targets{
Targets: m.selfMirrTgts,
})
if err != nil &&
!errors.Is(err, errors.ErrTargetNotFound) &&
!errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) {
var attrs trace.Attributes

switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
vald.InsertRPCName+" API canceld", err,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithDeadlineExceeded(
vald.InsertRPCName+" API deadline exceeded", err,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
default:
var (
st *status.Status
msg string
)
st, msg, err = status.ParseError(err, codes.Internal, "failed to parse "+vald.RegisterRPCName+" gRPC error response")
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
}
}
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
close(ech)
return nil, err
}

m.eg.Go(func() (err error) {
tic := time.NewTicker(m.advertiseDur)
m.eg.Go(func() error {
// TODO: change variable names.
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
tic := time.NewTimer(m.advertiseDur)
defer close(ech)
defer tic.Stop()

for {
select {
case <-ctx.Done():
return err
case <-tic.C:
resTgts, err := m.advertises(ctx, new(payload.Mirror_Targets))
if err != nil || len(resTgts) == 0 {
if err == nil {
err = errors.ErrTargetNotFound
}
tgt, err := m.MirrorTargets()
if err != nil {
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
}
}
if err = m.Connect(ctx, resTgts...); err != nil {

resTgts, err := m.registers(ctx, &payload.Mirror_Targets{Targets: tgt})
if err != nil || len(resTgts) == 0 {
if !errors.Is(err, errors.ErrTargetNotFound) && len(resTgts) == 0 {
err = errors.Join(err, errors.ErrTargetNotFound)
} else if len(resTgts) == 0 {
err = errors.ErrTargetNotFound
}
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
}
}

if err := m.registers(ctx, &payload.Mirror_Targets{
Targets: append(resTgts, m.selfMirrTgts...),
}); err != nil {
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
if len(resTgts) > 0 {
if err := m.Connect(ctx, resTgts...); err != nil {
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
}
}
}
log.Debugf("[mirror]: connected mirror gateway targets: %v", m.gateway.GRPCClient().ConnectedAddrs())
Expand All @@ -214,7 +174,7 @@ func (m *mirr) startAdvertise(ctx context.Context) (<-chan error, error) {
return ech, nil
}

func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) error {
func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.RegisterRPCName), "vald/gateway/mirror/service/Mirror.registers")
defer func() {
if span != nil {
Expand All @@ -227,16 +187,19 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) erro
resInfo := &errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName,
}
resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets()))
exists := make(map[string]struct{})
var mu sync.Mutex

return m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
err := m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.registers/"+target)
defer func() {
if span != nil {
span.End()
}
}()

_, err := vc.Register(ctx, tgts, copts...)
res, err := vc.Register(ctx, tgts, copts...)
if err != nil {
var attrs trace.Attributes
switch {
Expand Down Expand Up @@ -264,71 +227,6 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) erro
"failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)
}
log.Error("failed to send Register API to %s\t: %v", target, err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return err
}
return nil
})
}

func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.AdvertiseRPCName), "vald/gateway/vald/service/Mirror.advertises")
defer func() {
if span != nil {
span.End()
}
}()
reqInfo := &errdetails.RequestInfo{
ServingData: errdetails.Serialize(tgts),
}
resInfo := &errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.AdvertiseRPCName,
}
resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets()))
exists := make(map[string]struct{})
var mu sync.Mutex

err := m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.advertises/"+target)
defer func() {
if span != nil {
span.End()
}
}()
res, err := vc.Advertise(ctx, tgts)
if err != nil {
var attrs trace.Attributes
switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
vald.AdvertiseRPCName+" API canceld", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithCanceled(
vald.AdvertiseRPCName+" API deadline exceeded", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
vald.AdvertiseRPCName+" API connection not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInternal(err.Error())
default:
var (
st *status.Status
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
"failed to parse "+vald.AdvertiseRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)

// When ingress is deleted, the controller's default backend results(Unimplemented error) are returned so that the connection should be disconnected.
// If it is a different namespace on the same cluster, the connection is automatically disconnected because the net.grpc health check fails.
Expand All @@ -346,7 +244,7 @@ func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([]
}
}
}
log.Errorf("failed to process advertise requst to %s\terror: %s", target, err.Error())
log.Error("failed to send Register API to %s\t: %v", target, err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
Expand All @@ -356,8 +254,8 @@ func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([]
}
if res != nil && len(res.GetTargets()) > 0 {
for _, tgt := range res.GetTargets() {
mu.Lock()
addr := net.JoinHostPort(tgt.Host, uint16(tgt.Port))
mu.Lock()
if _, ok := exists[addr]; !ok {
exists[addr] = struct{}{}
resTgts = append(resTgts, res.GetTargets()...)
Expand Down Expand Up @@ -431,6 +329,7 @@ func (m *mirr) Exist(_ context.Context, addr string) bool {
return ok
}

// MirrorTargets returns own address and the addresses of other mirror gateways to which this gateway is currently connected.
func (m *mirr) MirrorTargets() ([]*payload.Mirror_Target, error) {
addrs := m.gateway.GRPCClient().ConnectedAddrs()
tgts := make([]*payload.Mirror_Target, 0, len(addrs)+1)
Expand Down Expand Up @@ -460,6 +359,7 @@ func (m *mirr) isGatewayAddr(addr string) bool {
return ok
}

// connected returns the addresses of other mirror gateways to which this gateway is currently connected.
func (m *mirr) connectedMirrorAddrs() []string {
connectedAddrs := m.gateway.GRPCClient().ConnectedAddrs()
addrs := make([]string, 0, len(connectedAddrs))
Expand Down
Loading