Skip to content

Commit

Permalink
Fix up existing delegate sync workarounds and todos
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky committed Sep 13, 2023
1 parent e6c86ba commit 780ed03
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 50 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/kralicky/gpkg v0.0.0-20220311205216-0d8ea9557555
github.com/kralicky/kmatch v0.0.0-20230301203314-20f658a0e56c
github.com/kralicky/ragu v1.0.11-0.20230627162951-2dd00e0cbbf3
github.com/kralicky/totem v1.2.0
github.com/kralicky/totem v1.2.1
github.com/kralicky/yaml/v3 v3.0.0-20220520012407-b0e7050bd81d
github.com/lestrrat-go/backoff/v2 v2.0.8
github.com/lestrrat-go/jwx v1.2.26
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1644,8 +1644,8 @@ github.com/kralicky/kmatch v0.0.0-20230301203314-20f658a0e56c h1:1kKbyl5GtGk/D+6
github.com/kralicky/kmatch v0.0.0-20230301203314-20f658a0e56c/go.mod h1:GXnSgs1TGXa8aX1hO9SUE1o3pLfggl5hkBMXdVWD8dc=
github.com/kralicky/ragu v1.0.11-0.20230627162951-2dd00e0cbbf3 h1:UB8Fk6c8Ait8DLtemgT/ItRdUpxJQHMDntqFopFt6/Q=
github.com/kralicky/ragu v1.0.11-0.20230627162951-2dd00e0cbbf3/go.mod h1:njQGnwU5IHm4Qx6/oOY0ll7Z6WA0apfP0ClpVA0M27Q=
github.com/kralicky/totem v1.2.0 h1:3pZuqhLrCYVbaxskkvubEm65AC/mquDLRTUZYGUxEEY=
github.com/kralicky/totem v1.2.0/go.mod h1:DFlQM6o5n1AWrU/+wd3hjLKoKZ+a0TPRUQxnCQvGKyY=
github.com/kralicky/totem v1.2.1 h1:ayd+IHbmmWhgFXJfzTHg0mJ2mr/IYHCh/nDqYyd6yCk=
github.com/kralicky/totem v1.2.1/go.mod h1:DFlQM6o5n1AWrU/+wd3hjLKoKZ+a0TPRUQxnCQvGKyY=
github.com/kralicky/yaml/v3 v3.0.0-20220520012407-b0e7050bd81d h1:kLfaaFdmCHKZCvL4DzQ7T9YsAVSBqZez34zaegldkls=
github.com/kralicky/yaml/v3 v3.0.0-20220520012407-b0e7050bd81d/go.mod h1:z4cKsjE0B5RlhFSbWcZNh70UEjtwrj8fpG1QXyU2KuI=
github.com/kralicky/zap v1.24.1-0.20230718165024-a2256218e4cc h1:FhngWZAuozJDezZPFebEy8QPbf7tw3MRv07RN8RLcg0=
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/apis/apiextensions/stream/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (w *targetedDelegatingClient[T]) Invoke(ctx context.Context, method string,
},
}
switch {
case w.target != nil:
case w.target != nil && w.target.Id != "":
respMsg, err := w.delegateClient.Request(ctx, &streamv1.DelegatedMessage{
Request: rpc,
Target: w.target,
Expand Down
10 changes: 3 additions & 7 deletions plugins/alerting/pkg/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,14 @@ func (s *AlertingNode) updateConfig(ctx context.Context, config *node.AlertingCa
}
eg.Wait()
s.configMu.Lock()
// TODO: this should ideally only be done if eg.Error() is nil, however
// there is a risk of an infinite sync loop since we have to manually
// re-sync when the driver status changes (see note in NewMetricsNode)
// Once we replace the sync manager with delegates, we can safely return
// errors from Sync and avoid the status condition workaround.
s.config = config
s.configMu.Unlock()
defer s.configMu.Unlock()

if err := eg.Error(); err != nil {
s.config.Conditions = (append(s.config.GetConditions(), err.Error()))
s.lg.With(zap.Error(err)).Errorf("%s node configuration error", s.capability)
return err
} else {
s.config = config
}

return nil
Expand Down
53 changes: 35 additions & 18 deletions plugins/alerting/pkg/node_backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package node_backend

import (
"context"
"errors"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/rancher/opni/pkg/agent"
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
managementv1 "github.com/rancher/opni/pkg/apis/management/v1"
streamv1 "github.com/rancher/opni/pkg/apis/stream/v1"
"github.com/rancher/opni/pkg/auth/cluster"
"github.com/rancher/opni/pkg/capabilities"
"github.com/rancher/opni/pkg/capabilities/wellknown"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/pkg/storage"

"github.com/rancher/opni/pkg/util"
"github.com/rancher/opni/pkg/util/future"
"go.uber.org/zap"
Expand Down Expand Up @@ -92,23 +93,39 @@ var FallbackDefaultNodeSpec = &node.AlertingCapabilitySpec{
},
}

func (a *AlertingNodeBackend) requestNodeSync(ctx context.Context, node *corev1.Reference) {
_, err := a.delegate.Get().WithTarget(node).SyncNow(ctx, &capabilityv1.Filter{
CapabilityNames: []string{wellknown.CapabilityAlerting},
})
name := node.GetId()
if name == "" {
name = "(all)"
}
lg := a.lg.With(
"cluster", name,
"capability", wellknown.CapabilityAlerting,
)
if err != nil {
lg.With(zap.Error(err)).Error("failed to request node sync; nodes may not be updated immediately")
func (a *AlertingNodeBackend) requestNodeSync(ctx context.Context, target *corev1.Reference) error {
if target == nil || target.Id == "" {
panic("bug: target must be non-nil and have a non-empty ID. this logic was recently changed - please update the caller")
}
_, err := a.delegate.Get().
WithTarget(target).
SyncNow(ctx, &capabilityv1.Filter{CapabilityNames: []string{wellknown.CapabilityAlerting}})
return err
}

lg.Info("node sync requested")
func (a *AlertingNodeBackend) broadcastNodeSync(ctx context.Context) {
// keep any metadata in the context, but don't propagate cancellation
ctx = context.WithoutCancel(ctx)
var errs []error
a.delegate.Get().
WithBroadcastSelector(&corev1.ClusterSelector{}, func(reply any, msg *streamv1.BroadcastReplyList) error {
for _, resp := range msg.GetResponses() {
err := resp.GetReply().GetResponse().GetStatus().Err()
if err != nil {
target := resp.GetRef()
errs = append(errs, status.Errorf(codes.Internal, "failed to sync agent %s: %v", target.GetId(), err))
}
}
return nil
}).
SyncNow(ctx, &capabilityv1.Filter{
CapabilityNames: []string{wellknown.CapabilityAlerting},
})
if len(errs) > 0 {
a.lg.With(
zap.Error(errors.Join(errs...)),
).Warn("one or more agents failed to sync; they may not be updated immediately")
}
}

func (a *AlertingNodeBackend) buildResponse(oldCfg, newCfg *node.AlertingCapabilityConfig) *node.SyncResponse {
Expand Down Expand Up @@ -147,7 +164,7 @@ func (a *AlertingNodeBackend) SetDefaultConfiguration(ctx context.Context, spec
if err := a.capabilityKV.Get().DefaultCapabilitySpec.Delete(ctx); err != nil {
return nil, err
}
a.requestNodeSync(ctx, &corev1.Reference{})
a.broadcastNodeSync(ctx)
return &emptypb.Empty{}, nil
}

Expand All @@ -159,7 +176,7 @@ func (a *AlertingNodeBackend) SetDefaultConfiguration(ctx context.Context, spec
return nil, err
}

a.requestNodeSync(ctx, &corev1.Reference{Id: ""})
a.broadcastNodeSync(ctx)

return &emptypb.Empty{}, nil
}
Expand Down
19 changes: 2 additions & 17 deletions plugins/metrics/pkg/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,6 @@ func NewMetricsNode(ct health.ConditionTracker, lg *zap.SugaredLogger) *MetricsN
mn.conditions.AddListener(mn.sendHealthUpdate)
mn.targetRunner.SetRemoteReaderClient(NewRemoteReader(&http.Client{}))

// FIXME: this is a hack, update the old sync code to use delegates instead
mn.conditions.AddListener(func(key string) {
if key == node.CondNodeDriver {
mn.logger.Info("forcing sync due to node driver status change")
go func() {
mn.doSync(context.TODO())
}()
}
})

return mn
}

Expand Down Expand Up @@ -332,17 +322,12 @@ func (m *MetricsNode) updateConfig(ctx context.Context, config *node.MetricsCapa

eg.Wait()

// TODO: this should ideally only be done if eg.Error() is nil, however
// there is a risk of an infinite sync loop since we have to manually
// re-sync when the driver status changes (see note in NewMetricsNode)
// Once we replace the sync manager with delegates, we can safely return
// errors from Sync and avoid the status condition workaround.
m.config = config

if err := eg.Error(); err != nil {
m.config.Conditions = append(config.Conditions, err.Error())
m.logger.With(zap.Error(err)).Error("node configuration error")
return err
}

m.config = config
return nil
}
3 changes: 3 additions & 0 deletions plugins/metrics/pkg/backend/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func (m *MetricsBackend) Initialize(conf MetricsBackendConfig) {
}

func (m *MetricsBackend) requestNodeSync(ctx context.Context, target *corev1.Reference) error {
if target == nil || target.Id == "" {
panic("bug: target must be non-nil and have a non-empty ID. this logic was recently changed - please update the caller")
}
_, err := m.Delegate.
WithTarget(target).
SyncNow(ctx, &capabilityv1.Filter{CapabilityNames: []string{wellknown.CapabilityMetrics}})
Expand Down
1 change: 1 addition & 0 deletions plugins/metrics/pkg/backend/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (m *NodeServiceBackend) SetNodeConfiguration(ctx context.Context, req *node
return nil, err
}

m.requestNodeSync(ctx, req.Node)
return &emptypb.Empty{}, nil
}
if err := req.Spec.Validate(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions plugins/metrics/pkg/backend/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (m *OpsServiceBackend) SetConfiguration(ctx context.Context, in *cortexops.
if err != nil {
return nil, err
}
m.requestNodeSync(ctx, &corev1.Reference{})
m.broadcastNodeSync(ctx)
return res, nil
}

Expand All @@ -63,7 +63,7 @@ func (m *OpsServiceBackend) ResetConfiguration(ctx context.Context, in *cortexop
if err != nil {
return nil, err
}
m.requestNodeSync(ctx, &corev1.Reference{})
m.broadcastNodeSync(ctx)
return res, nil
}

Expand All @@ -80,7 +80,7 @@ func (m *OpsServiceBackend) Install(ctx context.Context, in *emptypb.Empty) (*em
if err != nil {
return nil, err
}
m.requestNodeSync(ctx, &corev1.Reference{})
m.broadcastNodeSync(ctx)
return res, nil
}

Expand All @@ -100,7 +100,7 @@ func (m *OpsServiceBackend) Uninstall(ctx context.Context, in *emptypb.Empty) (*
if len(clustersWithCapability) > 0 {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("There are %d agents sending metrics to the Opni monitoring backend. Uninstall the capability on all agents before attempting to uninstall the Opni monitoring backend.", len(clustersWithCapability)))
}
defer m.requestNodeSync(ctx, &corev1.Reference{})
defer m.broadcastNodeSync(ctx)
return m.ClusterDriver.Uninstall(ctx, in)
}

Expand Down

0 comments on commit 780ed03

Please sign in to comment.