Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Returns all connection information in the registration rpc #2199

Merged
merged 3 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/vald/templates/gateway/mirror/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data:
{{- include "vald.observability" $observability | nindent 6 }}
gateway:
pod_name: {{ $gateway.gateway_config.pod_name }}
advertise_interval: {{ $gateway.gateway_config.advertise_interval }}
register_duration: {{ $gateway.gateway_config.register_duration }}
namespace: {{ $gateway.gateway_config.namespace }}
discovery_duration: {{ $gateway.gateway_config.discovery_duration }}
colocation: {{ $gateway.gateway_config.colocation }}
Expand Down
6 changes: 3 additions & 3 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1716,9 +1716,9 @@ gateway:
# @schema {"name": "gateway.mirror.gateway_config.pod_name", "type": "string"}
# gateway.mirror.gateway_config.pod_name -- self mirror gateway pod name
pod_name: _MY_POD_NAME_
# @schema {"name": "gateway.mirror.gateway_config.advertise_interval", "type": "string"}
# gateway.mirror.gateway_config.advertise_interval -- interval to advertise mirror-gateway information to other mirror-gateway.
advertise_interval: "1s"
# @schema {"name": "gateway.mirror.gateway_config.register_duration", "type": "string"}
# gateway.mirror.gateway_config.register_duration -- duration to register mirror-gateway.
register_duration: "1s"
# @schema {"name": "gateway.mirror.gateway_config.namespace", "type": "string"}
# gateway.mirror.gateway_config.namespace -- namespace to discovery
namespace: _MY_POD_NAMESPACE_
Expand Down
6 changes: 3 additions & 3 deletions internal/config/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type Mirror struct {
GatewayAddr string `json:"gateway_addr" yaml:"gateway_addr"`
// PodName represents the mirror gateway pod name.
PodName string `json:"pod_name" yaml:"pod_name"`
// AdvertiseInterval represents the interval to advertise addresses of Mirror Gateway to other Mirror Gateway.
AdvertiseInterval string `json:"advertise_interval" yaml:"advertise_interval"`
// RegisterDuration represents the duration to register Mirror Gateway.
RegisterDuration string `json:"register_duration" yaml:"register_duration"`
// Namespace represents the target namespace to discover ValdMirrorTarget resource.
Namespace string `json:"namespace" yaml:"namespace"`
// DiscoveryDuration represents the duration to discover.
Expand All @@ -43,7 +43,7 @@ func (m *Mirror) Bind() *Mirror {
m.SelfMirrorAddr = GetActualValue(m.SelfMirrorAddr)
m.GatewayAddr = GetActualValue(m.GatewayAddr)
m.PodName = GetActualValue(m.PodName)
m.AdvertiseInterval = GetActualValue(m.AdvertiseInterval)
m.RegisterDuration = GetActualValue(m.RegisterDuration)
m.Namespace = GetActualValue(m.Namespace)
m.DiscoveryDuration = GetActualValue(m.DiscoveryDuration)
m.Colocation = GetActualValue(m.Colocation)
Expand Down
14 changes: 3 additions & 11 deletions pkg/gateway/mirror/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,11 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
}
return nil, err
}
return req, nil
}

func (s *server) Advertise(ctx context.Context, req *payload.Mirror_Targets) (res *payload.Mirror_Targets, err error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.AdvertiseRPCName), apiName+"/"+vald.AdvertiseRPCName)
defer func() {
if span != nil {
span.End()
}
}()
// Get own address and the addresses of other mirror gateways to which this gateway is currently connected.
tgts, err := s.mirror.MirrorTargets()
if err != nil {
err = status.WrapWithInternal(vald.AdvertiseRPCName+" API failed to get connected vald gateway targets", err,
err = status.WrapWithInternal(vald.RegisterRPCName+" API failed to get connected vald gateway targets", err,
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Expand All @@ -141,7 +133,7 @@ func (s *server) Advertise(ctx context.Context, req *payload.Mirror_Targets) (re
},
},
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.AdvertiseRPCName,
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
},
)
Expand Down
200 changes: 35 additions & 165 deletions pkg/gateway/mirror/service/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// Mirror manages other mirror gateway connection.
// If there is a new Mirror Gateway components, registers new connection.
type Mirror interface {
Start(ctx context.Context) (<-chan error, error)
Start(ctx context.Context) <-chan error
Connect(ctx context.Context, targets ...*payload.Mirror_Target) error
Disconnect(ctx context.Context, targets ...*payload.Mirror_Target) error
IsConnected(ctx context.Context, addr string) bool
Expand All @@ -50,7 +50,7 @@ type mirr struct {
selfMirrAddrl sync.Map[string, any] // List of self Mirror gateway addresses
gwAddrl sync.Map[string, any] // List of Vald Gateway addresses
eg errgroup.Group
advertiseDur time.Duration
registerDur time.Duration
gateway Gateway
}

Expand Down Expand Up @@ -87,134 +87,64 @@ func NewMirror(opts ...MirrorOption) (_ Mirror, err error) {
return m, err
}

func (m *mirr) Start(ctx context.Context) (<-chan error, error) {
ech := make(chan error, 100)

aech, err := m.startAdvertise(ctx)
if err != nil {
close(ech)
return nil, err
}

m.eg.Go(func() (err error) {
defer close(ech)
for {
select {
case <-ctx.Done():
return ctx.Err()
case err = <-aech:
}
if err != nil {
select {
case <-ctx.Done():
case ech <- err:
}
err = nil
}
}
})
return ech, nil
}

func (m *mirr) startAdvertise(ctx context.Context) (<-chan error, error) {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.startAdvertise")
func (m *mirr) Start(ctx context.Context) <-chan error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.Start")
defer func() {
if span != nil {
span.End()
}
}()
ech := make(chan error, 100)

err := m.registers(ctx, &payload.Mirror_Targets{
Targets: m.selfMirrTgts,
})
if err != nil &&
!errors.Is(err, errors.ErrTargetNotFound) &&
!errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) {
var attrs trace.Attributes

switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
vald.InsertRPCName+" API canceld", err,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithDeadlineExceeded(
vald.InsertRPCName+" API deadline exceeded", err,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
default:
var (
st *status.Status
msg string
)
st, msg, err = status.ParseError(err, codes.Internal, "failed to parse "+vald.RegisterRPCName+" gRPC error response")
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
}
}
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
close(ech)
return nil, err
}

m.eg.Go(func() (err error) {
tic := time.NewTicker(m.advertiseDur)
m.eg.Go(func() error {
tic := time.NewTicker(m.registerDur)
defer close(ech)
defer tic.Stop()

for {
select {
case <-ctx.Done():
return err
case <-tic.C:
resTgts, err := m.advertises(ctx, new(payload.Mirror_Targets))
if err != nil || len(resTgts) == 0 {
if err == nil {
err = errors.ErrTargetNotFound
}
tgt, err := m.MirrorTargets()
if err != nil {
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
}
}
if err = m.Connect(ctx, resTgts...); err != nil {

resTgts, err := m.registers(ctx, &payload.Mirror_Targets{Targets: tgt})
if err != nil || len(resTgts) == 0 {
if !errors.Is(err, errors.ErrTargetNotFound) && len(resTgts) == 0 {
err = errors.Join(err, errors.ErrTargetNotFound)
} else if len(resTgts) == 0 {
err = errors.ErrTargetNotFound
}
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
}
}

if err := m.registers(ctx, &payload.Mirror_Targets{
Targets: append(resTgts, m.selfMirrTgts...),
}); err != nil {
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
if len(resTgts) > 0 {
if err := m.Connect(ctx, resTgts...); err != nil {
select {
case <-ctx.Done():
return ctx.Err()
case ech <- err:
break
}
}
}
log.Debugf("[mirror]: connected mirror gateway targets: %v", m.gateway.GRPCClient().ConnectedAddrs())
}
}
})
return ech, nil
return ech
}

func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) error {
func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.RegisterRPCName), "vald/gateway/mirror/service/Mirror.registers")
defer func() {
if span != nil {
Expand All @@ -227,16 +157,19 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) erro
resInfo := &errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName,
}
resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets()))
exists := make(map[string]struct{})
var mu sync.Mutex

return m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
err := m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.registers/"+target)
defer func() {
if span != nil {
span.End()
}
}()

_, err := vc.Register(ctx, tgts, copts...)
res, err := vc.Register(ctx, tgts, copts...)
if err != nil {
var attrs trace.Attributes
switch {
Expand Down Expand Up @@ -264,71 +197,6 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) erro
"failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)
}
log.Error("failed to send Register API to %s\t: %v", target, err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return err
}
return nil
})
}

func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.AdvertiseRPCName), "vald/gateway/vald/service/Mirror.advertises")
defer func() {
if span != nil {
span.End()
}
}()
reqInfo := &errdetails.RequestInfo{
ServingData: errdetails.Serialize(tgts),
}
resInfo := &errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.AdvertiseRPCName,
}
resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets()))
exists := make(map[string]struct{})
var mu sync.Mutex

err := m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.advertises/"+target)
defer func() {
if span != nil {
span.End()
}
}()
res, err := vc.Advertise(ctx, tgts)
if err != nil {
var attrs trace.Attributes
switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
vald.AdvertiseRPCName+" API canceld", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithCanceled(
vald.AdvertiseRPCName+" API deadline exceeded", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
vald.AdvertiseRPCName+" API connection not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInternal(err.Error())
default:
var (
st *status.Status
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
"failed to parse "+vald.AdvertiseRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)

// When ingress is deleted, the controller's default backend results(Unimplemented error) are returned so that the connection should be disconnected.
// If it is a different namespace on the same cluster, the connection is automatically disconnected because the net.grpc health check fails.
Expand All @@ -346,7 +214,7 @@ func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([]
}
}
}
log.Errorf("failed to process advertise requst to %s\terror: %s", target, err.Error())
log.Error("failed to send Register API to %s\t: %v", target, err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
Expand All @@ -356,8 +224,8 @@ func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([]
}
if res != nil && len(res.GetTargets()) > 0 {
for _, tgt := range res.GetTargets() {
mu.Lock()
addr := net.JoinHostPort(tgt.Host, uint16(tgt.Port))
mu.Lock()
if _, ok := exists[addr]; !ok {
exists[addr] = struct{}{}
resTgts = append(resTgts, res.GetTargets()...)
Expand Down Expand Up @@ -431,6 +299,7 @@ func (m *mirr) Exist(_ context.Context, addr string) bool {
return ok
}

// MirrorTargets returns own address and the addresses of other mirror gateways to which this gateway is currently connected.
func (m *mirr) MirrorTargets() ([]*payload.Mirror_Target, error) {
addrs := m.gateway.GRPCClient().ConnectedAddrs()
tgts := make([]*payload.Mirror_Target, 0, len(addrs)+1)
Expand Down Expand Up @@ -460,6 +329,7 @@ func (m *mirr) isGatewayAddr(addr string) bool {
return ok
}

// connected returns the addresses of other mirror gateways to which this gateway is currently connected.
func (m *mirr) connectedMirrorAddrs() []string {
connectedAddrs := m.gateway.GRPCClient().ConnectedAddrs()
addrs := make([]string, 0, len(connectedAddrs))
Expand Down
Loading
Loading