diff --git a/pkg/apis/capability/v1/capability.pb.go b/pkg/apis/capability/v1/capability.pb.go index 3549e319a3..6641c75f0e 100644 --- a/pkg/apis/capability/v1/capability.pb.go +++ b/pkg/apis/capability/v1/capability.pb.go @@ -11,7 +11,6 @@ import ( v1 "github.com/rancher/opni/pkg/apis/core/v1" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - _ "google.golang.org/protobuf/types/known/durationpb" emptypb "google.golang.org/protobuf/types/known/emptypb" structpb "google.golang.org/protobuf/types/known/structpb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -532,9 +531,7 @@ var file_github_com_rancher_opni_pkg_apis_capability_v1_capability_proto_rawDesc 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x6e, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x63, 0x6f, 0x72, 0x65, - 0x2f, 0x76, 0x31, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, - 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, + 0x2f, 0x76, 0x31, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, @@ -629,16 +626,11 @@ var file_github_com_rancher_opni_pkg_apis_capability_v1_capability_proto_rawDesc 0x0a, 0x07, 0x53, 0x79, 0x6e, 0x63, 0x4e, 0x6f, 0x77, 0x12, 0x12, 0x2e, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0x8a, 0xf1, 0x04, 0x02, 0x08, 0x01, 0x32, 0x4d, 0x0a, - 0x0b, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x3e, 0x0a, 0x0b, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x17, 0x2e, 0x63, 0x61, - 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x30, 0x5a, 0x2e, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x61, 0x6e, 0x63, 0x68, - 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x6e, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, - 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2f, 0x76, 0x31, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0x8a, 0xf1, 0x04, 0x02, 0x08, 0x01, 0x42, 0x30, 0x5a, + 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x61, 0x6e, 0x63, + 0x68, 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x6e, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, + 0x73, 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x2f, 0x76, 0x31, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -688,19 +680,17 @@ var file_github_com_rancher_opni_pkg_apis_capability_v1_capability_proto_depIdxs 9, // 13: capability.Backend.CancelUninstall:input_type -> core.Reference 12, // 14: capability.Backend.InstallerTemplate:input_type -> google.protobuf.Empty 3, // 15: capability.Node.SyncNow:input_type -> capability.Filter - 2, // 16: capability.NodeManager.RequestSync:input_type -> capability.SyncRequest - 1, // 17: capability.Backend.Info:output_type -> capability.Details - 12, // 18: capability.Backend.CanInstall:output_type -> google.protobuf.Empty - 5, // 19: capability.Backend.Install:output_type -> capability.InstallResponse - 8, // 20: capability.Backend.Status:output_type -> capability.NodeCapabilityStatus - 12, // 21: capability.Backend.Uninstall:output_type -> google.protobuf.Empty - 13, // 22: capability.Backend.UninstallStatus:output_type -> core.TaskStatus - 12, // 23: capability.Backend.CancelUninstall:output_type -> google.protobuf.Empty - 7, // 24: capability.Backend.InstallerTemplate:output_type -> capability.InstallerTemplateResponse - 12, // 25: capability.Node.SyncNow:output_type -> google.protobuf.Empty - 12, // 26: capability.NodeManager.RequestSync:output_type -> google.protobuf.Empty - 17, // [17:27] is the sub-list for method output_type - 7, // [7:17] is the sub-list for method input_type + 1, // 16: capability.Backend.Info:output_type -> capability.Details + 12, // 17: capability.Backend.CanInstall:output_type -> google.protobuf.Empty + 5, // 18: capability.Backend.Install:output_type -> capability.InstallResponse + 8, // 19: capability.Backend.Status:output_type -> capability.NodeCapabilityStatus + 12, // 20: capability.Backend.Uninstall:output_type -> google.protobuf.Empty + 13, // 21: capability.Backend.UninstallStatus:output_type -> core.TaskStatus + 12, // 22: capability.Backend.CancelUninstall:output_type -> google.protobuf.Empty + 7, // 23: capability.Backend.InstallerTemplate:output_type -> capability.InstallerTemplateResponse + 12, // 24: capability.Node.SyncNow:output_type -> google.protobuf.Empty + 16, // [16:25] is the sub-list for method output_type + 7, // [7:16] is the sub-list for method input_type 7, // [7:7] is the sub-list for extension type_name 7, // [7:7] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name @@ -817,7 +807,7 @@ func file_github_com_rancher_opni_pkg_apis_capability_v1_capability_proto_init() NumEnums: 1, NumMessages: 8, NumExtensions: 0, - NumServices: 3, + NumServices: 2, }, GoTypes: file_github_com_rancher_opni_pkg_apis_capability_v1_capability_proto_goTypes, DependencyIndexes: file_github_com_rancher_opni_pkg_apis_capability_v1_capability_proto_depIdxs, diff --git a/pkg/apis/capability/v1/capability.proto b/pkg/apis/capability/v1/capability.proto index 866e8e475b..88b391a8f6 100644 --- a/pkg/apis/capability/v1/capability.proto +++ b/pkg/apis/capability/v1/capability.proto @@ -4,7 +4,6 @@ package capability; import "github.com/kralicky/totem/extensions.proto"; import "github.com/rancher/opni/pkg/apis/core/v1/core.proto"; -import "google/protobuf/duration.proto"; import "google/protobuf/empty.proto"; import "google/protobuf/struct.proto"; import "google/protobuf/timestamp.proto"; @@ -55,10 +54,6 @@ service Node { } } -service NodeManager { - rpc RequestSync(SyncRequest) returns (google.protobuf.Empty); -} - message Details { string name = 1; string source = 2; diff --git a/pkg/apis/capability/v1/capability_grpc.pb.go b/pkg/apis/capability/v1/capability_grpc.pb.go index 19bbf0edfd..e74e1fc754 100644 --- a/pkg/apis/capability/v1/capability_grpc.pb.go +++ b/pkg/apis/capability/v1/capability_grpc.pb.go @@ -492,93 +492,3 @@ var Node_ServiceDesc = grpc.ServiceDesc{ Streams: []grpc.StreamDesc{}, Metadata: "github.com/rancher/opni/pkg/apis/capability/v1/capability.proto", } - -const ( - NodeManager_RequestSync_FullMethodName = "/capability.NodeManager/RequestSync" -) - -// NodeManagerClient is the client API for NodeManager service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type NodeManagerClient interface { - RequestSync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) -} - -type nodeManagerClient struct { - cc grpc.ClientConnInterface -} - -func NewNodeManagerClient(cc grpc.ClientConnInterface) NodeManagerClient { - return &nodeManagerClient{cc} -} - -func (c *nodeManagerClient) RequestSync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, NodeManager_RequestSync_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// NodeManagerServer is the server API for NodeManager service. -// All implementations must embed UnimplementedNodeManagerServer -// for forward compatibility -type NodeManagerServer interface { - RequestSync(context.Context, *SyncRequest) (*emptypb.Empty, error) - mustEmbedUnimplementedNodeManagerServer() -} - -// UnimplementedNodeManagerServer must be embedded to have forward compatible implementations. -type UnimplementedNodeManagerServer struct { -} - -func (UnimplementedNodeManagerServer) RequestSync(context.Context, *SyncRequest) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method RequestSync not implemented") -} -func (UnimplementedNodeManagerServer) mustEmbedUnimplementedNodeManagerServer() {} - -// UnsafeNodeManagerServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to NodeManagerServer will -// result in compilation errors. -type UnsafeNodeManagerServer interface { - mustEmbedUnimplementedNodeManagerServer() -} - -func RegisterNodeManagerServer(s grpc.ServiceRegistrar, srv NodeManagerServer) { - s.RegisterService(&NodeManager_ServiceDesc, srv) -} - -func _NodeManager_RequestSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(SyncRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(NodeManagerServer).RequestSync(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: NodeManager_RequestSync_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(NodeManagerServer).RequestSync(ctx, req.(*SyncRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// NodeManager_ServiceDesc is the grpc.ServiceDesc for NodeManager service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var NodeManager_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "capability.NodeManager", - HandlerType: (*NodeManagerServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "RequestSync", - Handler: _NodeManager_RequestSync_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "github.com/rancher/opni/pkg/apis/capability/v1/capability.proto", -} diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 69986f627c..4887fbe386 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -31,7 +31,6 @@ import ( bootstrapv1 "github.com/rancher/opni/pkg/apis/bootstrap/v1" bootstrapv2 "github.com/rancher/opni/pkg/apis/bootstrap/v2" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" controlv1 "github.com/rancher/opni/pkg/apis/control/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" streamv1 "github.com/rancher/opni/pkg/apis/stream/v1" @@ -66,7 +65,6 @@ type Gateway struct { storageBackend storage.Backend capBackendStore capabilities.BackendStore - syncRequester *SyncRequester } type GatewayOptions struct { @@ -270,10 +268,9 @@ func NewGateway(ctx context.Context, conf *config.GatewayConfig, pl plugins.Load // set up stream server listener := health.NewListener() monitor := health.NewMonitor(health.WithLogger(lg.Named("monitor"))) - sync := NewSyncRequester(lg) delegate := NewDelegateServer(storageBackend, lg) // set up agent connection handlers - agentHandler := MultiConnectionHandler(listener, sync, delegate) + agentHandler := MultiConnectionHandler(listener, delegate) go monitor.Run(ctx, listener) streamSvc := NewStreamServer(agentHandler, storageBackend, httpServer.metricsRegisterer, lg) @@ -302,7 +299,6 @@ func NewGateway(ctx context.Context, conf *config.GatewayConfig, pl plugins.Load httpServer: httpServer, grpcServer: grpcServer, statusQuerier: monitor, - syncRequester: sync, } waitctx.Go(ctx, func() { @@ -367,11 +363,6 @@ func (g *Gateway) CapabilitiesStore() capabilities.BackendStore { return g.capBackendStore } -// Implements management.CapabilitiesDataSource -func (g *Gateway) NodeManagerServer() capabilityv1.NodeManagerServer { - return g.syncRequester -} - // Implements management.HealthStatusDataSource func (g *Gateway) GetClusterHealthStatus(ref *corev1.Reference) (*corev1.HealthStatus, error) { hs := g.statusQuerier.GetHealthStatus(ref.Id) diff --git a/pkg/gateway/sync.go b/pkg/gateway/sync.go deleted file mode 100644 index 86d5c62551..0000000000 --- a/pkg/gateway/sync.go +++ /dev/null @@ -1,119 +0,0 @@ -package gateway - -import ( - "context" - "fmt" - "math/rand" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - agentv1 "github.com/rancher/opni/pkg/agent" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" - corev1 "github.com/rancher/opni/pkg/apis/core/v1" - "github.com/rancher/opni/pkg/auth/cluster" - "github.com/rancher/opni/pkg/util" - "github.com/rancher/opni/pkg/validation" - "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/emptypb" -) - -var ( - mSyncRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "opni", - Name: "server_sync_requests_total", - Help: "Total number of sync requests sent to agents", - }, []string{"cluster_id", "code", "code_text"}) -) - -type SyncRequester struct { - capabilityv1.UnsafeNodeManagerServer - mu sync.RWMutex - activeAgents map[string]agentv1.ClientSet - logger *zap.SugaredLogger -} - -func NewSyncRequester(lg *zap.SugaredLogger) *SyncRequester { - return &SyncRequester{ - activeAgents: make(map[string]agentv1.ClientSet), - logger: lg.Named("sync"), - } -} - -func (f *SyncRequester) HandleAgentConnection(ctx context.Context, clientSet agentv1.ClientSet) { - f.mu.Lock() - id := cluster.StreamAuthorizedID(ctx) - f.activeAgents[id] = clientSet - f.logger.With("id", id).Debug("agent connected") - f.mu.Unlock() - - // blocks until ctx is canceled - // send a periodic sync request to the agent every 5-10 minutes - f.runPeriodicSync(ctx, &capabilityv1.SyncRequest{ - Cluster: &corev1.Reference{ - Id: id, - }, - }, 5*time.Minute, 5*time.Minute) - - f.mu.Lock() - delete(f.activeAgents, id) - f.logger.With("id", id).Debug("agent disconnected") - f.mu.Unlock() -} - -// Implements capabilityv1.NodeManagerServer -func (f *SyncRequester) RequestSync(ctx context.Context, req *capabilityv1.SyncRequest) (*emptypb.Empty, error) { - if err := validation.Validate(req); err != nil { - return nil, status.Errorf(codes.InvalidArgument, err.Error()) - } - - f.mu.RLock() - defer f.mu.RUnlock() - - toSync := []agentv1.ClientSet{} - - if req.GetCluster().GetId() == "" { - for _, clientSet := range f.activeAgents { - toSync = append(toSync, clientSet) - } - } else { - if clientSet, ok := f.activeAgents[req.GetCluster().GetId()]; ok { - toSync = append(toSync, clientSet) - } - } - - for _, clientSet := range toSync { - f.logger.With( - "agentId", req.GetCluster().GetId(), - "capabilities", req.GetFilter().GetCapabilityNames(), - ).Debug("sending sync request to agent") - _, err := clientSet.SyncNow(ctx, req.GetFilter()) - code := status.Code(err) - mSyncRequests.WithLabelValues(req.GetCluster().GetId(), fmt.Sprint(code), code.String()).Inc() - if err != nil { - f.logger.With( - zap.Error(err), - ).Warn("sync request failed") - } - } - - return &emptypb.Empty{}, nil -} - -func (f *SyncRequester) runPeriodicSync(ctx context.Context, req *capabilityv1.SyncRequest, period time.Duration, jitter time.Duration) { - timer := time.NewTimer(period + time.Duration(rand.Int63n(int64(jitter)))) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - f.logger.Debug("running periodic sync") - go f.RequestSync(ctx, util.ProtoClone(req)) - timer.Reset(period + time.Duration(rand.Int63n(int64(jitter)))) - } - } -} diff --git a/pkg/management/server.go b/pkg/management/server.go index 9eb8e1ff63..991b00772d 100644 --- a/pkg/management/server.go +++ b/pkg/management/server.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/caching" @@ -52,7 +51,6 @@ type CoreDataSource interface { // server needs to serve capabilities-related endpoints type CapabilitiesDataSource interface { CapabilitiesStore() capabilities.BackendStore - NodeManagerServer() capabilityv1.NodeManagerServer } type HealthStatusDataSource interface { @@ -148,15 +146,9 @@ func NewServer( otelgrpc.UnaryServerInterceptor()), ) managementv1.RegisterManagementServer(m.grpcServer, m) - if m.capabilitiesDataSource != nil { - capabilityv1.RegisterNodeManagerServer(m.grpcServer, m.capabilitiesDataSource.NodeManagerServer()) - } pluginLoader.Hook(hooks.OnLoadM(func(sp types.SystemPlugin, md meta.PluginMeta) { go sp.ServeManagementAPI(m) - if m.capabilitiesDataSource != nil { - go sp.ServeNodeManagerServer(m.capabilitiesDataSource.NodeManagerServer()) - } go func() { if err := sp.ServeAPIExtensions(m.config.GRPCListenAddress); err != nil { lg.With( diff --git a/pkg/management/server_test.go b/pkg/management/server_test.go index 9aaeddc8ae..063208b95d 100644 --- a/pkg/management/server_test.go +++ b/pkg/management/server_test.go @@ -3,7 +3,6 @@ package management_test import ( "context" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/capabilities" "github.com/rancher/opni/pkg/config/v1beta1" @@ -27,10 +26,6 @@ func (t testCapabilityDataSource) CapabilitiesStore() capabilities.BackendStore return t.store } -func (t testCapabilityDataSource) NodeManagerServer() capabilityv1.NodeManagerServer { - return capabilityv1.UnimplementedNodeManagerServer{} -} - var _ = Describe("Server", Ordered, Label("unit"), func() { var tv *testVars var capBackendStore capabilities.BackendStore diff --git a/pkg/plugins/apis/apiextensions/stream/delegate.go b/pkg/plugins/apis/apiextensions/stream/delegate.go index ce7a3a9612..7b3c0b19f1 100644 --- a/pkg/plugins/apis/apiextensions/stream/delegate.go +++ b/pkg/plugins/apis/apiextensions/stream/delegate.go @@ -3,9 +3,10 @@ package stream import ( "context" "fmt" + "strings" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "strings" "github.com/kralicky/totem" corev1 "github.com/rancher/opni/pkg/apis/core/v1" @@ -17,8 +18,7 @@ import ( // Aggregator allows the user to aggregate the responses from a broadcast request, and store the result in reply. type Aggregator func(reply any, msg *streamv1.BroadcastReplyList) error -// EmptyNothingAggregator is an aggregator to use if you don't need to do any aggregation, or don't care about the response. -var EmptyNothingAggregator Aggregator = func(any, *streamv1.BroadcastReplyList) error { +func DiscardReplies(any, *streamv1.BroadcastReplyList) error { return nil } diff --git a/pkg/plugins/apis/system/plugin.go b/pkg/plugins/apis/system/plugin.go index 10b1e7a71d..1ecdb8b046 100644 --- a/pkg/plugins/apis/system/plugin.go +++ b/pkg/plugins/apis/system/plugin.go @@ -6,7 +6,6 @@ import ( "time" "github.com/hashicorp/go-plugin" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/caching" "github.com/rancher/opni/pkg/plugins" @@ -22,7 +21,6 @@ import ( type SystemPluginClient interface { UseManagementAPI(managementv1.ManagementClient) - UseNodeManagerClient(capabilityv1.NodeManagerClient) UseCachingProvider(caching.CachingProvider[proto.Message]) UseKeyValueStore(KeyValueStoreClient) UseAPIExtensions(ExtensionClientInterface) @@ -33,7 +31,6 @@ type SystemPluginClient interface { type UnimplementedSystemPluginClient struct{} func (UnimplementedSystemPluginClient) UseManagementAPI(managementv1.ManagementClient) {} -func (UnimplementedSystemPluginClient) UseNodeManagerClient(capabilityv1.NodeManagerClient) {} func (UnimplementedSystemPluginClient) UseKeyValueStore(KeyValueStoreClient) {} func (UnimplementedSystemPluginClient) UseAPIExtensions(ExtensionClientInterface) {} func (UnimplementedSystemPluginClient) UseCachingProvider(caching.CachingProvider[proto.Message]) {} @@ -41,7 +38,6 @@ func (UnimplementedSystemPluginClient) mustEmbedUnimplementedSystemPluginClient( type SystemPluginServer interface { ServeManagementAPI(managementv1.ManagementServer) - ServeNodeManagerServer(capabilityv1.NodeManagerServer) ServeKeyValueStore(storage.KeyValueStore) ServeAPIExtensions(dialAddress string) error ServeCachingProvider() @@ -103,17 +99,6 @@ func (c *systemPluginClientImpl) UseManagementAPI(_ context.Context, in *BrokerI return &emptypb.Empty{}, nil } -func (c *systemPluginClientImpl) UseNodeManagerClient(_ context.Context, in *BrokerID) (*emptypb.Empty, error) { - cc, err := c.broker.Dial(in.Id) - if err != nil { - return nil, err - } - defer cc.Close() - client := capabilityv1.NewNodeManagerClient(cc) - c.client.UseNodeManagerClient(client) - return &emptypb.Empty{}, nil -} - func (c *systemPluginClientImpl) UseKeyValueStore(_ context.Context, in *BrokerID) (*emptypb.Empty, error) { cc, err := c.broker.Dial(in.Id) if err != nil { @@ -194,19 +179,6 @@ func (s *systemPluginHandler) ServeManagementAPI(api managementv1.ManagementServ ) } -func (s *systemPluginHandler) ServeNodeManagerServer(api capabilityv1.NodeManagerServer) { - s.serveSystemApi( - func(srv *grpc.Server) { - capabilityv1.RegisterNodeManagerServer(srv, api) - }, - func(id uint32) { - s.client.UseNodeManagerClient(s.ctx, &BrokerID{ - Id: id, - }) - }, - ) -} - func (s *systemPluginHandler) ServeKeyValueStore(store storage.KeyValueStore) { kvStoreSrv := NewKVStoreServer(store) s.serveSystemApi( diff --git a/pkg/plugins/apis/system/system.pb.go b/pkg/plugins/apis/system/system.pb.go index af72a8dfea..e684b05310 100644 --- a/pkg/plugins/apis/system/system.pb.go +++ b/pkg/plugins/apis/system/system.pb.go @@ -967,53 +967,49 @@ var file_github_com_rancher_opni_pkg_plugins_apis_system_system_proto_rawDesc = 0x52, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x70, 0x72, 0x65, 0x76, 0x69, 0x6f, 0x75, 0x73, 0x22, 0x20, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x65, 0x6c, 0x65, - 0x74, 0x65, 0x10, 0x01, 0x32, 0xcd, 0x02, 0x0a, 0x06, 0x53, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, + 0x74, 0x65, 0x10, 0x01, 0x32, 0x8b, 0x02, 0x0a, 0x06, 0x53, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x3c, 0x0a, 0x10, 0x55, 0x73, 0x65, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x41, 0x50, 0x49, 0x12, 0x10, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x49, 0x44, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x40, 0x0a, - 0x14, 0x55, 0x73, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x43, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x10, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x42, - 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x49, 0x44, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, - 0x3c, 0x0a, 0x10, 0x55, 0x73, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x53, 0x74, - 0x6f, 0x72, 0x65, 0x12, 0x10, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x42, 0x72, 0x6f, - 0x6b, 0x65, 0x72, 0x49, 0x44, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x3f, 0x0a, - 0x10, 0x55, 0x73, 0x65, 0x41, 0x50, 0x49, 0x45, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, - 0x73, 0x12, 0x13, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x44, 0x69, 0x61, 0x6c, 0x41, - 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x44, - 0x0a, 0x12, 0x55, 0x73, 0x65, 0x43, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x50, 0x72, 0x6f, 0x76, - 0x69, 0x64, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x32, 0xdb, 0x02, 0x0a, 0x0d, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x2e, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x12, 0x2e, - 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x13, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x12, 0x2e, - 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x13, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, - 0x14, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x57, - 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x37, - 0x0a, 0x06, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x15, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, - 0x6d, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x16, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3d, 0x0a, 0x08, 0x4c, 0x69, 0x73, 0x74, 0x4b, - 0x65, 0x79, 0x73, 0x12, 0x17, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x4c, 0x69, 0x73, - 0x74, 0x4b, 0x65, 0x79, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x73, - 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4b, 0x65, 0x79, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, - 0x79, 0x12, 0x16, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, - 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x79, 0x73, 0x74, - 0x65, 0x6d, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x6e, 0x69, 0x2f, 0x70, 0x6b, - 0x67, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x73, - 0x79, 0x73, 0x74, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x3c, 0x0a, + 0x10, 0x55, 0x73, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x53, 0x74, 0x6f, 0x72, + 0x65, 0x12, 0x10, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x49, 0x44, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x3f, 0x0a, 0x10, 0x55, + 0x73, 0x65, 0x41, 0x50, 0x49, 0x45, 0x78, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, + 0x13, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x44, 0x69, 0x61, 0x6c, 0x41, 0x64, 0x64, + 0x72, 0x65, 0x73, 0x73, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x44, 0x0a, 0x12, + 0x55, 0x73, 0x65, 0x43, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, + 0x65, 0x72, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x32, 0xdb, 0x02, 0x0a, 0x0d, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x53, + 0x74, 0x6f, 0x72, 0x65, 0x12, 0x2e, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x12, 0x2e, 0x73, 0x79, + 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x13, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x12, 0x2e, 0x73, 0x79, + 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x13, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x14, 0x2e, + 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x57, 0x61, 0x74, + 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x37, 0x0a, 0x06, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x15, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, + 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3d, 0x0a, 0x08, 0x4c, 0x69, 0x73, 0x74, 0x4b, 0x65, 0x79, + 0x73, 0x12, 0x17, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4b, + 0x65, 0x79, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x73, 0x79, 0x73, + 0x74, 0x65, 0x6d, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4b, 0x65, 0x79, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x12, + 0x16, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, + 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, + 0x61, 0x6e, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x6e, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, + 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x73, 0x79, 0x73, + 0x74, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1057,29 +1053,27 @@ var file_github_com_rancher_opni_pkg_plugins_apis_system_system_proto_depIdxs = 13, // 3: system.WatchResponse.current:type_name -> system.KeyRevision 13, // 4: system.WatchResponse.previous:type_name -> system.KeyRevision 1, // 5: system.System.UseManagementAPI:input_type -> system.BrokerID - 1, // 6: system.System.UseNodeManagerClient:input_type -> system.BrokerID - 1, // 7: system.System.UseKeyValueStore:input_type -> system.BrokerID - 2, // 8: system.System.UseAPIExtensions:input_type -> system.DialAddress - 17, // 9: system.System.UseCachingProvider:input_type -> google.protobuf.Empty - 3, // 10: system.KeyValueStore.Put:input_type -> system.PutRequest - 5, // 11: system.KeyValueStore.Get:input_type -> system.GetRequest - 14, // 12: system.KeyValueStore.Watch:input_type -> system.WatchRequest - 7, // 13: system.KeyValueStore.Delete:input_type -> system.DeleteRequest - 9, // 14: system.KeyValueStore.ListKeys:input_type -> system.ListKeysRequest - 11, // 15: system.KeyValueStore.History:input_type -> system.HistoryRequest - 17, // 16: system.System.UseManagementAPI:output_type -> google.protobuf.Empty - 17, // 17: system.System.UseNodeManagerClient:output_type -> google.protobuf.Empty - 17, // 18: system.System.UseKeyValueStore:output_type -> google.protobuf.Empty - 17, // 19: system.System.UseAPIExtensions:output_type -> google.protobuf.Empty - 17, // 20: system.System.UseCachingProvider:output_type -> google.protobuf.Empty - 4, // 21: system.KeyValueStore.Put:output_type -> system.PutResponse - 6, // 22: system.KeyValueStore.Get:output_type -> system.GetResponse - 15, // 23: system.KeyValueStore.Watch:output_type -> system.WatchResponse - 8, // 24: system.KeyValueStore.Delete:output_type -> system.DeleteResponse - 10, // 25: system.KeyValueStore.ListKeys:output_type -> system.ListKeysResponse - 12, // 26: system.KeyValueStore.History:output_type -> system.HistoryResponse - 16, // [16:27] is the sub-list for method output_type - 5, // [5:16] is the sub-list for method input_type + 1, // 6: system.System.UseKeyValueStore:input_type -> system.BrokerID + 2, // 7: system.System.UseAPIExtensions:input_type -> system.DialAddress + 17, // 8: system.System.UseCachingProvider:input_type -> google.protobuf.Empty + 3, // 9: system.KeyValueStore.Put:input_type -> system.PutRequest + 5, // 10: system.KeyValueStore.Get:input_type -> system.GetRequest + 14, // 11: system.KeyValueStore.Watch:input_type -> system.WatchRequest + 7, // 12: system.KeyValueStore.Delete:input_type -> system.DeleteRequest + 9, // 13: system.KeyValueStore.ListKeys:input_type -> system.ListKeysRequest + 11, // 14: system.KeyValueStore.History:input_type -> system.HistoryRequest + 17, // 15: system.System.UseManagementAPI:output_type -> google.protobuf.Empty + 17, // 16: system.System.UseKeyValueStore:output_type -> google.protobuf.Empty + 17, // 17: system.System.UseAPIExtensions:output_type -> google.protobuf.Empty + 17, // 18: system.System.UseCachingProvider:output_type -> google.protobuf.Empty + 4, // 19: system.KeyValueStore.Put:output_type -> system.PutResponse + 6, // 20: system.KeyValueStore.Get:output_type -> system.GetResponse + 15, // 21: system.KeyValueStore.Watch:output_type -> system.WatchResponse + 8, // 22: system.KeyValueStore.Delete:output_type -> system.DeleteResponse + 10, // 23: system.KeyValueStore.ListKeys:output_type -> system.ListKeysResponse + 12, // 24: system.KeyValueStore.History:output_type -> system.HistoryResponse + 15, // [15:25] is the sub-list for method output_type + 5, // [5:15] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name 5, // [5:5] is the sub-list for extension extendee 0, // [0:5] is the sub-list for field type_name diff --git a/pkg/plugins/apis/system/system.proto b/pkg/plugins/apis/system/system.proto index aebf2ed57e..5deb6a8a5b 100644 --- a/pkg/plugins/apis/system/system.proto +++ b/pkg/plugins/apis/system/system.proto @@ -10,7 +10,6 @@ option go_package = "github.com/rancher/opni/pkg/plugins/apis/system"; service System { rpc UseManagementAPI(BrokerID) returns (google.protobuf.Empty); - rpc UseNodeManagerClient(BrokerID) returns (google.protobuf.Empty); rpc UseKeyValueStore(BrokerID) returns (google.protobuf.Empty); rpc UseAPIExtensions(DialAddress) returns (google.protobuf.Empty); rpc UseCachingProvider(google.protobuf.Empty) returns (google.protobuf.Empty); diff --git a/pkg/plugins/apis/system/system_grpc.pb.go b/pkg/plugins/apis/system/system_grpc.pb.go index 0a55413014..2377c8439c 100644 --- a/pkg/plugins/apis/system/system_grpc.pb.go +++ b/pkg/plugins/apis/system/system_grpc.pb.go @@ -20,11 +20,10 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - System_UseManagementAPI_FullMethodName = "/system.System/UseManagementAPI" - System_UseNodeManagerClient_FullMethodName = "/system.System/UseNodeManagerClient" - System_UseKeyValueStore_FullMethodName = "/system.System/UseKeyValueStore" - System_UseAPIExtensions_FullMethodName = "/system.System/UseAPIExtensions" - System_UseCachingProvider_FullMethodName = "/system.System/UseCachingProvider" + System_UseManagementAPI_FullMethodName = "/system.System/UseManagementAPI" + System_UseKeyValueStore_FullMethodName = "/system.System/UseKeyValueStore" + System_UseAPIExtensions_FullMethodName = "/system.System/UseAPIExtensions" + System_UseCachingProvider_FullMethodName = "/system.System/UseCachingProvider" ) // SystemClient is the client API for System service. @@ -32,7 +31,6 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type SystemClient interface { UseManagementAPI(ctx context.Context, in *BrokerID, opts ...grpc.CallOption) (*emptypb.Empty, error) - UseNodeManagerClient(ctx context.Context, in *BrokerID, opts ...grpc.CallOption) (*emptypb.Empty, error) UseKeyValueStore(ctx context.Context, in *BrokerID, opts ...grpc.CallOption) (*emptypb.Empty, error) UseAPIExtensions(ctx context.Context, in *DialAddress, opts ...grpc.CallOption) (*emptypb.Empty, error) UseCachingProvider(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error) @@ -55,15 +53,6 @@ func (c *systemClient) UseManagementAPI(ctx context.Context, in *BrokerID, opts return out, nil } -func (c *systemClient) UseNodeManagerClient(ctx context.Context, in *BrokerID, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, System_UseNodeManagerClient_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *systemClient) UseKeyValueStore(ctx context.Context, in *BrokerID, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) err := c.cc.Invoke(ctx, System_UseKeyValueStore_FullMethodName, in, out, opts...) @@ -96,7 +85,6 @@ func (c *systemClient) UseCachingProvider(ctx context.Context, in *emptypb.Empty // for forward compatibility type SystemServer interface { UseManagementAPI(context.Context, *BrokerID) (*emptypb.Empty, error) - UseNodeManagerClient(context.Context, *BrokerID) (*emptypb.Empty, error) UseKeyValueStore(context.Context, *BrokerID) (*emptypb.Empty, error) UseAPIExtensions(context.Context, *DialAddress) (*emptypb.Empty, error) UseCachingProvider(context.Context, *emptypb.Empty) (*emptypb.Empty, error) @@ -110,9 +98,6 @@ type UnimplementedSystemServer struct { func (UnimplementedSystemServer) UseManagementAPI(context.Context, *BrokerID) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method UseManagementAPI not implemented") } -func (UnimplementedSystemServer) UseNodeManagerClient(context.Context, *BrokerID) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method UseNodeManagerClient not implemented") -} func (UnimplementedSystemServer) UseKeyValueStore(context.Context, *BrokerID) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method UseKeyValueStore not implemented") } @@ -153,24 +138,6 @@ func _System_UseManagementAPI_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } -func _System_UseNodeManagerClient_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(BrokerID) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(SystemServer).UseNodeManagerClient(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: System_UseNodeManagerClient_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SystemServer).UseNodeManagerClient(ctx, req.(*BrokerID)) - } - return interceptor(ctx, in, info, handler) -} - func _System_UseKeyValueStore_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BrokerID) if err := dec(in); err != nil { @@ -236,10 +203,6 @@ var System_ServiceDesc = grpc.ServiceDesc{ MethodName: "UseManagementAPI", Handler: _System_UseManagementAPI_Handler, }, - { - MethodName: "UseNodeManagerClient", - Handler: _System_UseNodeManagerClient_Handler, - }, { MethodName: "UseKeyValueStore", Handler: _System_UseKeyValueStore_Handler, diff --git a/pkg/test/mock/capability/backend.go b/pkg/test/mock/capability/backend.go index f31801d425..2b67de46a3 100644 --- a/pkg/test/mock/capability/backend.go +++ b/pkg/test/mock/capability/backend.go @@ -515,131 +515,3 @@ func (mr *MockUnsafeNodeServerMockRecorder) mustEmbedUnimplementedNodeServer() * mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedNodeServer", reflect.TypeOf((*MockUnsafeNodeServer)(nil).mustEmbedUnimplementedNodeServer)) } - -// MockNodeManagerClient is a mock of NodeManagerClient interface. -type MockNodeManagerClient struct { - ctrl *gomock.Controller - recorder *MockNodeManagerClientMockRecorder -} - -// MockNodeManagerClientMockRecorder is the mock recorder for MockNodeManagerClient. -type MockNodeManagerClientMockRecorder struct { - mock *MockNodeManagerClient -} - -// NewMockNodeManagerClient creates a new mock instance. -func NewMockNodeManagerClient(ctrl *gomock.Controller) *MockNodeManagerClient { - mock := &MockNodeManagerClient{ctrl: ctrl} - mock.recorder = &MockNodeManagerClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockNodeManagerClient) EXPECT() *MockNodeManagerClientMockRecorder { - return m.recorder -} - -// RequestSync mocks base method. -func (m *MockNodeManagerClient) RequestSync(ctx context.Context, in *v1.SyncRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { - m.ctrl.T.Helper() - varargs := []interface{}{ctx, in} - for _, a := range opts { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "RequestSync", varargs...) - ret0, _ := ret[0].(*emptypb.Empty) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// RequestSync indicates an expected call of RequestSync. -func (mr *MockNodeManagerClientMockRecorder) RequestSync(ctx, in interface{}, opts ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, in}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestSync", reflect.TypeOf((*MockNodeManagerClient)(nil).RequestSync), varargs...) -} - -// MockNodeManagerServer is a mock of NodeManagerServer interface. -type MockNodeManagerServer struct { - ctrl *gomock.Controller - recorder *MockNodeManagerServerMockRecorder -} - -// MockNodeManagerServerMockRecorder is the mock recorder for MockNodeManagerServer. -type MockNodeManagerServerMockRecorder struct { - mock *MockNodeManagerServer -} - -// NewMockNodeManagerServer creates a new mock instance. -func NewMockNodeManagerServer(ctrl *gomock.Controller) *MockNodeManagerServer { - mock := &MockNodeManagerServer{ctrl: ctrl} - mock.recorder = &MockNodeManagerServerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockNodeManagerServer) EXPECT() *MockNodeManagerServerMockRecorder { - return m.recorder -} - -// RequestSync mocks base method. -func (m *MockNodeManagerServer) RequestSync(arg0 context.Context, arg1 *v1.SyncRequest) (*emptypb.Empty, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RequestSync", arg0, arg1) - ret0, _ := ret[0].(*emptypb.Empty) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// RequestSync indicates an expected call of RequestSync. -func (mr *MockNodeManagerServerMockRecorder) RequestSync(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestSync", reflect.TypeOf((*MockNodeManagerServer)(nil).RequestSync), arg0, arg1) -} - -// mustEmbedUnimplementedNodeManagerServer mocks base method. -func (m *MockNodeManagerServer) mustEmbedUnimplementedNodeManagerServer() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "mustEmbedUnimplementedNodeManagerServer") -} - -// mustEmbedUnimplementedNodeManagerServer indicates an expected call of mustEmbedUnimplementedNodeManagerServer. -func (mr *MockNodeManagerServerMockRecorder) mustEmbedUnimplementedNodeManagerServer() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedNodeManagerServer", reflect.TypeOf((*MockNodeManagerServer)(nil).mustEmbedUnimplementedNodeManagerServer)) -} - -// MockUnsafeNodeManagerServer is a mock of UnsafeNodeManagerServer interface. -type MockUnsafeNodeManagerServer struct { - ctrl *gomock.Controller - recorder *MockUnsafeNodeManagerServerMockRecorder -} - -// MockUnsafeNodeManagerServerMockRecorder is the mock recorder for MockUnsafeNodeManagerServer. -type MockUnsafeNodeManagerServerMockRecorder struct { - mock *MockUnsafeNodeManagerServer -} - -// NewMockUnsafeNodeManagerServer creates a new mock instance. -func NewMockUnsafeNodeManagerServer(ctrl *gomock.Controller) *MockUnsafeNodeManagerServer { - mock := &MockUnsafeNodeManagerServer{ctrl: ctrl} - mock.recorder = &MockUnsafeNodeManagerServerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockUnsafeNodeManagerServer) EXPECT() *MockUnsafeNodeManagerServerMockRecorder { - return m.recorder -} - -// mustEmbedUnimplementedNodeManagerServer mocks base method. -func (m *MockUnsafeNodeManagerServer) mustEmbedUnimplementedNodeManagerServer() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "mustEmbedUnimplementedNodeManagerServer") -} - -// mustEmbedUnimplementedNodeManagerServer indicates an expected call of mustEmbedUnimplementedNodeManagerServer. -func (mr *MockUnsafeNodeManagerServerMockRecorder) mustEmbedUnimplementedNodeManagerServer() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedNodeManagerServer", reflect.TypeOf((*MockUnsafeNodeManagerServer)(nil).mustEmbedUnimplementedNodeManagerServer)) -} diff --git a/plugins/alerting/pkg/alerting/plugin.go b/plugins/alerting/pkg/alerting/plugin.go index dd364a362b..ea05eebf57 100644 --- a/plugins/alerting/pkg/alerting/plugin.go +++ b/plugins/alerting/pkg/alerting/plugin.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/rancher/opni/pkg/agent" "github.com/rancher/opni/pkg/config/v1beta1" "github.com/rancher/opni/pkg/management" "github.com/rancher/opni/pkg/metrics/collector" @@ -29,7 +30,6 @@ import ( "github.com/rancher/opni/plugins/alerting/pkg/node_backend" "go.uber.org/zap" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/logger" managementext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/management" @@ -68,7 +68,7 @@ type Plugin struct { mgmtClient future.Future[managementv1.ManagementClient] storageBackend future.Future[storage.Backend] capabilitySpecStore future.Future[node_backend.CapabilitySpecKV] - capabilityManager future.Future[capabilityv1.NodeManagerClient] + delegate future.Future[streamext.StreamDelegate[agent.ClientSet]] adminClient future.Future[cortexadmin.CortexAdminClient] cortexOpsClient future.Future[cortexops.CortexOpsClient] natsConn future.Future[*nats.Conn] @@ -114,7 +114,7 @@ func NewPlugin(ctx context.Context) *Plugin { mgmtClient: future.New[managementv1.ManagementClient](), storageBackend: future.New[storage.Backend](), capabilitySpecStore: future.New[node_backend.CapabilitySpecKV](), - capabilityManager: future.New[capabilityv1.NodeManagerClient](), + delegate: future.New[streamext.StreamDelegate[agent.ClientSet]](), adminClient: future.New[cortexadmin.CortexAdminClient](), cortexOpsClient: future.New[cortexops.CortexOpsClient](), @@ -160,14 +160,14 @@ func NewPlugin(ctx context.Context) *Plugin { p.storageBackend, p.mgmtClient, p.capabilitySpecStore, - p.capabilityManager, + p.delegate, func( storageBackend storage.Backend, mgmtClient managementv1.ManagementClient, specStore node_backend.CapabilitySpecKV, - capabilityManager capabilityv1.NodeManagerClient, + delegate streamext.StreamDelegate[agent.ClientSet], ) { - p.node.Initialize(specStore, mgmtClient, capabilityManager, storageBackend) + p.node.Initialize(specStore, mgmtClient, delegate, storageBackend) }, ) diff --git a/plugins/alerting/pkg/alerting/stream.go b/plugins/alerting/pkg/alerting/stream.go index 12c0ed4e7d..38f5247991 100644 --- a/plugins/alerting/pkg/alerting/stream.go +++ b/plugins/alerting/pkg/alerting/stream.go @@ -1,10 +1,12 @@ package alerting import ( + "github.com/rancher/opni/pkg/agent" "github.com/rancher/opni/pkg/capabilities/wellknown" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/alerting/pkg/apis/node" "github.com/rancher/opni/plugins/alerting/pkg/apis/rules" + "google.golang.org/grpc" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -21,3 +23,7 @@ func (p *Plugin) StreamServers() []streamext.Server { }, } } + +func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { + p.delegate.Set(streamext.NewDelegate(cc, agent.NewClientSet)) +} diff --git a/plugins/alerting/pkg/alerting/system.go b/plugins/alerting/pkg/alerting/system.go index 8783a5b8a3..3646b41a44 100644 --- a/plugins/alerting/pkg/alerting/system.go +++ b/plugins/alerting/pkg/alerting/system.go @@ -24,7 +24,6 @@ import ( alertingStorage "github.com/rancher/opni/pkg/alerting/storage" "github.com/rancher/opni/pkg/alerting/server" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/config/v1beta1" "github.com/rancher/opni/pkg/machinery" @@ -195,11 +194,6 @@ func (p *Plugin) UseAPIExtensions(intf system.ExtensionClientInterface) { p.cortexOpsClient.Set(cortexops.NewCortexOpsClient(cc)) } -func (p *Plugin) UseNodeManagerClient(client capabilityv1.NodeManagerClient) { - p.capabilityManager.Set(client) - <-p.ctx.Done() -} - func (p *Plugin) handleDriverNotifications() { for { select { diff --git a/plugins/alerting/pkg/node_backend/backend.go b/plugins/alerting/pkg/node_backend/backend.go index f07ac004d3..a80cf8afbb 100644 --- a/plugins/alerting/pkg/node_backend/backend.go +++ b/plugins/alerting/pkg/node_backend/backend.go @@ -5,12 +5,14 @@ import ( "sync" "github.com/google/go-cmp/cmp" + "github.com/rancher/opni/pkg/agent" capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/auth/cluster" "github.com/rancher/opni/pkg/capabilities" "github.com/rancher/opni/pkg/capabilities/wellknown" + streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/pkg/storage" "github.com/rancher/opni/pkg/util" @@ -44,9 +46,9 @@ type AlertingNodeBackend struct { nodeStatusMu sync.RWMutex nodeStatus map[string]*capabilityv1.NodeCapabilityStatus - nodeManagerClient future.Future[capabilityv1.NodeManagerClient] - mgmtClient future.Future[managementv1.ManagementClient] - storageBackend future.Future[storage.Backend] + delegate future.Future[streamext.StreamDelegate[agent.ClientSet]] + mgmtClient future.Future[managementv1.ManagementClient] + storageBackend future.Future[storage.Backend] capabilityKV future.Future[CapabilitySpecKV] } @@ -55,24 +57,24 @@ func NewAlertingNodeBackend( lg *zap.SugaredLogger, ) *AlertingNodeBackend { return &AlertingNodeBackend{ - lg: lg, - nodeManagerClient: future.New[capabilityv1.NodeManagerClient](), - mgmtClient: future.New[managementv1.ManagementClient](), - storageBackend: future.New[storage.Backend](), - capabilityKV: future.New[CapabilitySpecKV](), - nodeStatus: make(map[string]*capabilityv1.NodeCapabilityStatus), + lg: lg, + delegate: future.New[streamext.StreamDelegate[agent.ClientSet]](), + mgmtClient: future.New[managementv1.ManagementClient](), + storageBackend: future.New[storage.Backend](), + capabilityKV: future.New[CapabilitySpecKV](), + nodeStatus: make(map[string]*capabilityv1.NodeCapabilityStatus), } } func (a *AlertingNodeBackend) Initialize( kv CapabilitySpecKV, mgmtClient managementv1.ManagementClient, - nodeManagerClient capabilityv1.NodeManagerClient, + nodeManagerClient streamext.StreamDelegate[agent.ClientSet], storageBackend storage.Backend, ) { a.InitOnce(func() { a.capabilityKV.Set(kv) - a.nodeManagerClient.Set(nodeManagerClient) + a.delegate.Set(nodeManagerClient) a.mgmtClient.Set(mgmtClient) a.storageBackend.Set(storageBackend) }) @@ -91,11 +93,8 @@ var FallbackDefaultNodeSpec = &node.AlertingCapabilitySpec{ } func (a *AlertingNodeBackend) requestNodeSync(ctx context.Context, node *corev1.Reference) { - _, err := a.nodeManagerClient.Get().RequestSync(ctx, &capabilityv1.SyncRequest{ - Cluster: node, - Filter: &capabilityv1.Filter{ - CapabilityNames: []string{wellknown.CapabilityAlerting}, - }, + _, err := a.delegate.Get().WithTarget(node).SyncNow(ctx, &capabilityv1.Filter{ + CapabilityNames: []string{wellknown.CapabilityAlerting}, }) name := node.GetId() if name == "" { diff --git a/plugins/logging/pkg/backend/logging.go b/plugins/logging/pkg/backend/logging.go index 7183bbefe9..ae80821cfb 100644 --- a/plugins/logging/pkg/backend/logging.go +++ b/plugins/logging/pkg/backend/logging.go @@ -2,15 +2,16 @@ package backend import ( "context" - "sync" - "slices" + "sync" + "github.com/rancher/opni/pkg/agent" capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" opnicorev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/capabilities/wellknown" "github.com/rancher/opni/pkg/management" + streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/pkg/storage" "github.com/rancher/opni/pkg/task" "github.com/rancher/opni/pkg/util" @@ -34,13 +35,13 @@ type LoggingBackend struct { } type LoggingBackendConfig struct { - Logger *zap.SugaredLogger `validate:"required"` - StorageBackend storage.Backend `validate:"required"` - MgmtClient managementv1.ManagementClient `validate:"required"` - NodeManagerClient capabilityv1.NodeManagerClient `validate:"required"` - UninstallController *task.Controller `validate:"required"` - OpensearchManager *opensearchdata.Manager `validate:"required"` - ClusterDriver driver.ClusterDriver `validate:"required"` + Logger *zap.SugaredLogger `validate:"required"` + StorageBackend storage.Backend `validate:"required"` + MgmtClient managementv1.ManagementClient `validate:"required"` + Delegate streamext.StreamDelegate[agent.ClientSet] `validate:"required"` + UninstallController *task.Controller `validate:"required"` + OpensearchManager *opensearchdata.Manager `validate:"required"` + ClusterDriver driver.ClusterDriver `validate:"required"` } var _ node.NodeLoggingCapabilityServer = (*LoggingBackend)(nil) diff --git a/plugins/logging/pkg/backend/sync.go b/plugins/logging/pkg/backend/sync.go index 46abc70ec5..69466e04d5 100644 --- a/plugins/logging/pkg/backend/sync.go +++ b/plugins/logging/pkg/backend/sync.go @@ -112,11 +112,8 @@ func (b *LoggingBackend) shouldDisableNode(ctx context.Context) bool { } func (b *LoggingBackend) requestNodeSync(ctx context.Context, cluster *opnicorev1.Reference) { - _, err := b.NodeManagerClient.RequestSync(ctx, &capabilityv1.SyncRequest{ - Cluster: cluster, - Filter: &capabilityv1.Filter{ - CapabilityNames: []string{wellknown.CapabilityLogs}, - }, + _, err := b.Delegate.WithTarget(cluster).SyncNow(ctx, &capabilityv1.Filter{ + CapabilityNames: []string{wellknown.CapabilityLogs}, }) name := cluster.GetId() diff --git a/plugins/logging/pkg/gateway/plugin.go b/plugins/logging/pkg/gateway/plugin.go index 37d8bebcc3..7387f11f1e 100644 --- a/plugins/logging/pkg/gateway/plugin.go +++ b/plugins/logging/pkg/gateway/plugin.go @@ -15,6 +15,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" + "github.com/rancher/opni/pkg/agent" capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/features" @@ -57,7 +58,7 @@ type Plugin struct { storageBackend future.Future[storage.Backend] kv future.Future[system.KeyValueStoreClient] mgmtApi future.Future[managementv1.ManagementClient] - nodeManagerClient future.Future[capabilityv1.NodeManagerClient] + delegate future.Future[streamext.StreamDelegate[agent.ClientSet]] uninstallController future.Future[*task.Controller] alertingServer *alerting.AlertingManagementServer opensearchManager *opensearchdata.Manager @@ -142,7 +143,7 @@ func NewPlugin(ctx context.Context, opts ...PluginOption) *Plugin { lg.Named("opensearch-manager"), kv, ), - nodeManagerClient: future.New[capabilityv1.NodeManagerClient](), + delegate: future.New[streamext.StreamDelegate[agent.ClientSet]](), otelForwarder: otel.NewOTELForwarder( otel.WithLogger(lg.Named("otel-forwarder")), otel.WithAddress(fmt.Sprintf( @@ -154,19 +155,19 @@ func NewPlugin(ctx context.Context, opts ...PluginOption) *Plugin { ), } - future.Wait4(p.storageBackend, p.mgmtApi, p.uninstallController, p.nodeManagerClient, + future.Wait4(p.storageBackend, p.mgmtApi, p.uninstallController, p.delegate, func( storageBackend storage.Backend, mgmtClient managementv1.ManagementClient, uninstallController *task.Controller, - nodeManagerClient capabilityv1.NodeManagerClient, + delegate streamext.StreamDelegate[agent.ClientSet], ) { p.logging.Initialize(backend.LoggingBackendConfig{ Logger: p.logger.Named("logging-backend"), StorageBackend: storageBackend, UninstallController: uninstallController, MgmtClient: mgmtClient, - NodeManagerClient: nodeManagerClient, + Delegate: delegate, OpensearchManager: p.opensearchManager, ClusterDriver: p.backendDriver, }) diff --git a/plugins/logging/pkg/gateway/stream.go b/plugins/logging/pkg/gateway/stream.go index 9be07198ed..fc2fe31a64 100644 --- a/plugins/logging/pkg/gateway/stream.go +++ b/plugins/logging/pkg/gateway/stream.go @@ -1,10 +1,12 @@ package gateway import ( + "github.com/rancher/opni/pkg/agent" "github.com/rancher/opni/pkg/capabilities/wellknown" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/logging/apis/node" collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + "google.golang.org/grpc" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -21,3 +23,7 @@ func (p *Plugin) StreamServers() []streamext.Server { }, } } + +func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { + p.delegate.Set(streamext.NewDelegate(cc, agent.NewClientSet)) +} diff --git a/plugins/logging/pkg/gateway/system.go b/plugins/logging/pkg/gateway/system.go index 386d63da92..bf68fc0988 100644 --- a/plugins/logging/pkg/gateway/system.go +++ b/plugins/logging/pkg/gateway/system.go @@ -4,7 +4,6 @@ import ( "context" "os" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" opnicorev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/config/v1beta1" @@ -51,11 +50,6 @@ func (p *Plugin) UseManagementAPI(client managementv1.ManagementClient) { <-p.ctx.Done() } -func (p *Plugin) UseNodeManagerClient(client capabilityv1.NodeManagerClient) { - p.nodeManagerClient.Set(client) - <-p.ctx.Done() -} - func (p *Plugin) UseKeyValueStore(client system.KeyValueStoreClient) { p.kv.Set(client) ctrl, err := task.NewController(p.ctx, "uninstall", system.NewKVStoreClient[*opnicorev1.TaskStatus](client), &UninstallTaskRunner{ diff --git a/plugins/metrics/pkg/backend/capability.go b/plugins/metrics/pkg/backend/capability.go index bb7137de89..30d40604a9 100644 --- a/plugins/metrics/pkg/backend/capability.go +++ b/plugins/metrics/pkg/backend/capability.go @@ -14,6 +14,7 @@ import ( "github.com/rancher/opni/pkg/task" "github.com/rancher/opni/pkg/util" "github.com/rancher/opni/plugins/metrics/pkg/gateway/drivers" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -73,7 +74,12 @@ func (m *MetricsBackend) Install(ctx context.Context, req *v1.InstallRequest) (* return nil, err } - m.requestNodeSync(ctx, req.Cluster) + if err := m.requestNodeSync(ctx, req.Cluster); err != nil { + return &v1.InstallResponse{ + Status: v1.InstallResponseStatus_Warning, + Message: fmt.Errorf("sync request failed; agent may not be updated immediately: %v", err).Error(), + }, nil + } if warningErr != nil { return &v1.InstallResponse{ @@ -144,7 +150,7 @@ func (m *MetricsBackend) Uninstall(ctx context.Context, req *v1.UninstallRequest break } if !exists { - return nil, status.Error(codes.FailedPrecondition, "cluster does not have the reuqested capability") + return nil, status.Error(codes.FailedPrecondition, "cluster does not have the requested capability") } now := timestamppb.Now() @@ -159,7 +165,13 @@ func (m *MetricsBackend) Uninstall(ctx context.Context, req *v1.UninstallRequest if err != nil { return nil, fmt.Errorf("failed to update cluster metadata: %v", err) } - m.requestNodeSync(ctx, req.Cluster) + if err := m.requestNodeSync(ctx, req.Cluster); err != nil { + m.Logger.With( + zap.Error(err), + "agent", req.Cluster, + ).Warn("sync request failed; agent may not be updated immediately") + // continue; this is not a fatal error + } md := uninstall.TimestampedMetadata{ DefaultUninstallOptions: defaultOpts, @@ -184,7 +196,6 @@ func (m *MetricsBackend) CancelUninstall(ctx context.Context, cluster *corev1.Re m.UninstallController.CancelTask(cluster.Id) - m.requestNodeSync(ctx, cluster) return &emptypb.Empty{}, nil } diff --git a/plugins/metrics/pkg/backend/metrics.go b/plugins/metrics/pkg/backend/metrics.go index da642ba8e8..b5cb5579f7 100644 --- a/plugins/metrics/pkg/backend/metrics.go +++ b/plugins/metrics/pkg/backend/metrics.go @@ -2,9 +2,11 @@ package backend import ( "context" + "errors" "sync" "sync/atomic" + "github.com/rancher/opni/pkg/agent" "github.com/rancher/opni/pkg/config/v1beta1" "github.com/rancher/opni/plugins/metrics/apis/cortexops" "github.com/rancher/opni/plugins/metrics/apis/node" @@ -22,6 +24,7 @@ import ( capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" + streamv1 "github.com/rancher/opni/pkg/apis/stream/v1" "github.com/rancher/opni/pkg/auth/cluster" "github.com/rancher/opni/pkg/capabilities/wellknown" "github.com/rancher/opni/pkg/storage" @@ -52,15 +55,19 @@ type MetricsBackend struct { var _ node.NodeMetricsCapabilityServer = (*MetricsBackend)(nil) var _ remoteread.RemoteReadGatewayServer = (*MetricsBackend)(nil) +type MetricsAgentClientSet interface { + agent.ClientSet + remoteread.RemoteReadAgentClient +} + type MetricsBackendConfig struct { - Logger *zap.SugaredLogger `validate:"required"` - StorageBackend storage.Backend `validate:"required"` - MgmtClient managementv1.ManagementClient `validate:"required"` - NodeManagerClient capabilityv1.NodeManagerClient `validate:"required"` - UninstallController *task.Controller `validate:"required"` - ClusterDriver drivers.ClusterDriver `validate:"required"` - Delegate streamext.StreamDelegate[remoteread.RemoteReadAgentClient] `validate:"required"` - KV *KVClients `validate:"required"` + Logger *zap.SugaredLogger `validate:"required"` + StorageBackend storage.Backend `validate:"required"` + MgmtClient managementv1.ManagementClient `validate:"required"` + UninstallController *task.Controller `validate:"required"` + ClusterDriver drivers.ClusterDriver `validate:"required"` + Delegate streamext.StreamDelegate[MetricsAgentClientSet] `validate:"required"` + KV *KVClients `validate:"required"` } type KVClients struct { @@ -80,30 +87,36 @@ func (m *MetricsBackend) Initialize(conf MetricsBackendConfig) { }) } -func (m *MetricsBackend) requestNodeSync(ctx context.Context, cluster *corev1.Reference) { - _, err := m.NodeManagerClient.RequestSync(ctx, &capabilityv1.SyncRequest{ - Cluster: cluster, - Filter: &capabilityv1.Filter{ - CapabilityNames: []string{wellknown.CapabilityMetrics}, - }, - }) +func (m *MetricsBackend) requestNodeSync(ctx context.Context, target *corev1.Reference) error { + _, err := m.Delegate. + WithTarget(target). + SyncNow(ctx, &capabilityv1.Filter{CapabilityNames: []string{wellknown.CapabilityMetrics}}) + return err +} - name := cluster.GetId() - if name == "" { - name = "(all)" - } - if err != nil { +func (m *MetricsBackend) broadcastNodeSync(ctx context.Context) { + // keep any metadata in the context, but don't propagate cancellation + ctx = context.WithoutCancel(ctx) + var errs []error + m.Delegate. + WithBroadcastSelector(&corev1.ClusterSelector{}, func(reply any, msg *streamv1.BroadcastReplyList) error { + for _, resp := range msg.GetResponses() { + err := resp.GetReply().GetResponse().GetStatus().Err() + if err != nil { + target := resp.GetRef() + errs = append(errs, status.Errorf(codes.Internal, "failed to sync agent %s: %v", target.GetId(), err)) + } + } + return nil + }). + SyncNow(ctx, &capabilityv1.Filter{ + CapabilityNames: []string{wellknown.CapabilityMetrics}, + }) + if len(errs) > 0 { m.Logger.With( - "cluster", name, - "capability", wellknown.CapabilityMetrics, - zap.Error(err), - ).Warn("failed to request node sync; nodes may not be updated immediately") - return + zap.Error(errors.Join(errs...)), + ).Warn("one or more agents failed to sync; they may not be updated immediately") } - m.Logger.With( - "cluster", name, - "capability", wellknown.CapabilityMetrics, - ).Info("node sync requested") } // Implements node.NodeMetricsCapabilityServer diff --git a/plugins/metrics/pkg/backend/node.go b/plugins/metrics/pkg/backend/node.go index a34f56842b..0dd6cb10f1 100644 --- a/plugins/metrics/pkg/backend/node.go +++ b/plugins/metrics/pkg/backend/node.go @@ -4,9 +4,12 @@ import ( "context" "github.com/google/go-cmp/cmp" + capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" - v1 "github.com/rancher/opni/pkg/apis/core/v1" + "github.com/rancher/opni/pkg/capabilities/wellknown" + "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/metrics/apis/node" + "google.golang.org/grpc" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/emptypb" ) @@ -16,12 +19,20 @@ type NodeServiceBackend struct { *MetricsBackend } +func (m *NodeServiceBackend) syncAllNodes() error { + _, err := m.Delegate.WithBroadcastSelector(&corev1.ClusterSelector{}, stream.DiscardReplies). + SyncNow(context.Background(), &capabilityv1.Filter{ + CapabilityNames: []string{wellknown.CapabilityMetrics}, + }, grpc.WaitForReady(false)) + return err +} + func (m *NodeServiceBackend) GetDefaultNodeConfiguration(ctx context.Context, _ *emptypb.Empty) (*node.MetricsCapabilitySpec, error) { m.WaitForInit() return m.getDefaultNodeSpec(ctx) } -func (m *NodeServiceBackend) GetNodeConfiguration(ctx context.Context, node *v1.Reference) (*node.MetricsCapabilitySpec, error) { +func (m *NodeServiceBackend) GetNodeConfiguration(ctx context.Context, node *corev1.Reference) (*node.MetricsCapabilitySpec, error) { m.WaitForInit() return m.getNodeSpecOrDefault(ctx, node.GetId()) } @@ -33,7 +44,7 @@ func (m *NodeServiceBackend) SetDefaultNodeConfiguration(ctx context.Context, co if err := m.KV.DefaultCapabilitySpec.Delete(ctx); err != nil { return nil, err } - m.requestNodeSync(ctx, &corev1.Reference{}) + m.broadcastNodeSync(ctx) return &emptypb.Empty{}, nil } if err := conf.Validate(); err != nil { @@ -42,7 +53,7 @@ func (m *NodeServiceBackend) SetDefaultNodeConfiguration(ctx context.Context, co if err := m.KV.DefaultCapabilitySpec.Put(ctx, conf); err != nil { return nil, err } - m.requestNodeSync(ctx, &corev1.Reference{}) + m.broadcastNodeSync(ctx) return &emptypb.Empty{}, nil } @@ -52,7 +63,7 @@ func (m *NodeServiceBackend) SetNodeConfiguration(ctx context.Context, req *node if err := m.KV.NodeCapabilitySpecs.Delete(ctx, req.Node.GetId()); err != nil { return nil, err } - m.requestNodeSync(ctx, req.Node) + return &emptypb.Empty{}, nil } if err := req.Spec.Validate(); err != nil { diff --git a/plugins/metrics/pkg/gateway/plugin.go b/plugins/metrics/pkg/gateway/plugin.go index 15a26326ed..4e149e41a5 100644 --- a/plugins/metrics/pkg/gateway/plugin.go +++ b/plugins/metrics/pkg/gateway/plugin.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/auth" "github.com/rancher/opni/pkg/config/v1beta1" @@ -50,13 +49,12 @@ type Plugin struct { config future.Future[*v1beta1.GatewayConfig] authMw future.Future[map[string]auth.Middleware] mgmtClient future.Future[managementv1.ManagementClient] - nodeManagerClient future.Future[capabilityv1.NodeManagerClient] storageBackend future.Future[storage.Backend] cortexTlsConfig future.Future[*tls.Config] cortexClientSet future.Future[cortex.ClientSet] uninstallController future.Future[*task.Controller] clusterDriver future.Future[drivers.ClusterDriver] - delegate future.Future[streamext.StreamDelegate[remoteread.RemoteReadAgentClient]] + delegate future.Future[streamext.StreamDelegate[backend.MetricsAgentClientSet]] backendKvClients future.Future[*backend.KVClients] } @@ -78,13 +76,12 @@ func NewPlugin(ctx context.Context) *Plugin { config: future.New[*v1beta1.GatewayConfig](), authMw: future.New[map[string]auth.Middleware](), mgmtClient: future.New[managementv1.ManagementClient](), - nodeManagerClient: future.New[capabilityv1.NodeManagerClient](), storageBackend: future.New[storage.Backend](), cortexTlsConfig: future.New[*tls.Config](), cortexClientSet: future.New[cortex.ClientSet](), uninstallController: future.New[*task.Controller](), clusterDriver: future.New[drivers.ClusterDriver](), - delegate: future.New[streamext.StreamDelegate[remoteread.RemoteReadAgentClient]](), + delegate: future.New[streamext.StreamDelegate[backend.MetricsAgentClientSet]](), backendKvClients: future.New[*backend.KVClients](), } p.metrics.OpsBackend = &backend.OpsServiceBackend{MetricsBackend: &p.metrics} @@ -149,21 +146,19 @@ func NewPlugin(ctx context.Context) *Plugin { ).Info("initialized cluster driver") p.clusterDriver.Set(driver) }) - future.Wait7(p.storageBackend, p.mgmtClient, p.nodeManagerClient, p.uninstallController, p.clusterDriver, p.delegate, p.backendKvClients, + future.Wait6(p.storageBackend, p.mgmtClient, p.uninstallController, p.clusterDriver, p.delegate, p.backendKvClients, func( storageBackend storage.Backend, mgmtClient managementv1.ManagementClient, - nodeManagerClient capabilityv1.NodeManagerClient, uninstallController *task.Controller, clusterDriver drivers.ClusterDriver, - delegate streamext.StreamDelegate[remoteread.RemoteReadAgentClient], + delegate streamext.StreamDelegate[backend.MetricsAgentClientSet], backendKvClients *backend.KVClients, ) { p.metrics.Initialize(backend.MetricsBackendConfig{ Logger: p.logger.Named("metrics-backend"), StorageBackend: storageBackend, MgmtClient: mgmtClient, - NodeManagerClient: nodeManagerClient, UninstallController: uninstallController, ClusterDriver: clusterDriver, Delegate: delegate, diff --git a/plugins/metrics/pkg/gateway/stream.go b/plugins/metrics/pkg/gateway/stream.go index 2ef19274ba..1e01c71198 100644 --- a/plugins/metrics/pkg/gateway/stream.go +++ b/plugins/metrics/pkg/gateway/stream.go @@ -3,6 +3,7 @@ package gateway import ( "context" + "github.com/rancher/opni/pkg/agent" "github.com/rancher/opni/pkg/auth/cluster" "github.com/rancher/opni/pkg/capabilities/wellknown" "github.com/rancher/opni/pkg/metrics" @@ -10,6 +11,7 @@ import ( "github.com/rancher/opni/plugins/metrics/apis/node" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" + "github.com/rancher/opni/plugins/metrics/pkg/backend" "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" ) @@ -35,7 +37,16 @@ func (p *Plugin) StreamServers() []streamext.Server { } func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { - p.delegate.Set(streamext.NewDelegate(cc, remoteread.NewRemoteReadAgentClient)) + type clientset struct { + agent.ClientSet + remoteread.RemoteReadAgentClient + } + p.delegate.Set(streamext.NewDelegate(cc, func(cci grpc.ClientConnInterface) backend.MetricsAgentClientSet { + return &clientset{ + ClientSet: agent.NewClientSet(cci), + RemoteReadAgentClient: remoteread.NewRemoteReadAgentClient(cci), + } + })) } func (p *Plugin) labelsForStreamMetrics(ctx context.Context) []attribute.KeyValue { diff --git a/plugins/metrics/pkg/gateway/system.go b/plugins/metrics/pkg/gateway/system.go index 9cd5b474d2..4b078e8941 100644 --- a/plugins/metrics/pkg/gateway/system.go +++ b/plugins/metrics/pkg/gateway/system.go @@ -10,7 +10,6 @@ import ( "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/config/v1beta1" @@ -68,11 +67,6 @@ func (p *Plugin) UseManagementAPI(client managementv1.ManagementClient) { <-p.ctx.Done() } -func (p *Plugin) UseNodeManagerClient(client capabilityv1.NodeManagerClient) { - p.nodeManagerClient.Set(client) - <-p.ctx.Done() -} - func (p *Plugin) UseKeyValueStore(client system.KeyValueStoreClient) { ctrl, err := task.NewController(p.ctx, "uninstall", system.NewKVStoreClient[*corev1.TaskStatus](client), &p.uninstallRunner) if err != nil { diff --git a/plugins/topology/pkg/backend/topology.go b/plugins/topology/pkg/backend/topology.go index a41417d716..5fc621eca0 100644 --- a/plugins/topology/pkg/backend/topology.go +++ b/plugins/topology/pkg/backend/topology.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/rancher/opni/pkg/agent" capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" @@ -14,6 +15,7 @@ import ( "github.com/rancher/opni/pkg/capabilities" "github.com/rancher/opni/pkg/capabilities/wellknown" "github.com/rancher/opni/pkg/machinery/uninstall" + streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/pkg/storage" "github.com/rancher/opni/pkg/task" "github.com/rancher/opni/pkg/util" @@ -29,12 +31,12 @@ import ( ) type TopologyBackendConfig struct { - Logger *zap.SugaredLogger `validate:"required"` - StorageBackend storage.Backend `validate:"required"` - MgmtClient managementv1.ManagementClient `validate:"required"` - NodeManagerClient capabilityv1.NodeManagerClient `validate:"required"` - UninstallController *task.Controller `validate:"required"` - ClusterDriver drivers.ClusterDriver `validate:"required"` + Logger *zap.SugaredLogger `validate:"required"` + StorageBackend storage.Backend `validate:"required"` + MgmtClient managementv1.ManagementClient `validate:"required"` + Delegate streamext.StreamDelegate[agent.ClientSet] `validate:"required"` + UninstallController *task.Controller `validate:"required"` + ClusterDriver drivers.ClusterDriver `validate:"required"` } type TopologyBackend struct { @@ -109,11 +111,8 @@ func (t *TopologyBackend) canInstall(ctx context.Context) error { } func (t *TopologyBackend) requestNodeSync(ctx context.Context, cluster *corev1.Reference) { - _, err := t.NodeManagerClient.RequestSync(ctx, &capabilityv1.SyncRequest{ - Cluster: cluster, - Filter: &capabilityv1.Filter{ - CapabilityNames: []string{wellknown.CapabilityTopology}, - }, + _, err := t.Delegate.WithTarget(cluster).SyncNow(ctx, &capabilityv1.Filter{ + CapabilityNames: []string{wellknown.CapabilityTopology}, }) name := cluster.GetId() if name == "" { diff --git a/plugins/topology/pkg/topology/gateway/plugin.go b/plugins/topology/pkg/topology/gateway/plugin.go index 9a9d68f1b2..376c42ca37 100644 --- a/plugins/topology/pkg/topology/gateway/plugin.go +++ b/plugins/topology/pkg/topology/gateway/plugin.go @@ -8,7 +8,7 @@ import ( "context" "github.com/nats-io/nats.go" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" + "github.com/rancher/opni/pkg/agent" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/config/v1beta1" "github.com/rancher/opni/pkg/logger" @@ -48,7 +48,7 @@ type Plugin struct { storageBackend future.Future[storage.Backend] gatewayConfig future.Future[*v1beta1.GatewayConfig] - nodeManagerClient future.Future[capabilityv1.NodeManagerClient] + delegate future.Future[streamext.StreamDelegate[agent.ClientSet]] uninstallController future.Future[*task.Controller] clusterDriver future.Future[drivers.ClusterDriver] } @@ -61,7 +61,7 @@ func NewPlugin(ctx context.Context) *Plugin { storage: future.New[ConfigStorageAPIs](), mgmtClient: future.New[managementv1.ManagementClient](), storageBackend: future.New[storage.Backend](), - nodeManagerClient: future.New[capabilityv1.NodeManagerClient](), + delegate: future.New[streamext.StreamDelegate[agent.ClientSet]](), uninstallController: future.New[*task.Controller](), clusterDriver: future.New[drivers.ClusterDriver](), topologyBackend: backend.TopologyBackend{}, @@ -81,18 +81,18 @@ func NewPlugin(ctx context.Context) *Plugin { }) p.logger.Debug("waiting for async requirements for starting topology backend") - future.Wait5(p.storageBackend, p.mgmtClient, p.nodeManagerClient, p.uninstallController, p.clusterDriver, + future.Wait5(p.storageBackend, p.mgmtClient, p.delegate, p.uninstallController, p.clusterDriver, func( storageBackend storage.Backend, mgmtClient managementv1.ManagementClient, - nodeManagerClient capabilityv1.NodeManagerClient, + delegate streamext.StreamDelegate[agent.ClientSet], uninstallController *task.Controller, clusterDriver drivers.ClusterDriver, ) { p.logger.With( "storageBackend", storageBackend, "mgmtClient", mgmtClient, - "nodeManagerClient", nodeManagerClient, + "delegate", delegate, "uninstallController", uninstallController, "clusterDriver", clusterDriver, ).Debug("async requirements for starting topology backend are ready") @@ -100,7 +100,7 @@ func NewPlugin(ctx context.Context) *Plugin { Logger: p.logger.Named("topology-backend"), StorageBackend: storageBackend, MgmtClient: mgmtClient, - NodeManagerClient: nodeManagerClient, + Delegate: delegate, UninstallController: uninstallController, ClusterDriver: clusterDriver, }) diff --git a/plugins/topology/pkg/topology/gateway/stream.go b/plugins/topology/pkg/topology/gateway/stream.go index 0cdb4326c3..412d2b29d6 100644 --- a/plugins/topology/pkg/topology/gateway/stream.go +++ b/plugins/topology/pkg/topology/gateway/stream.go @@ -1,10 +1,12 @@ package gateway import ( + "github.com/rancher/opni/pkg/agent" "github.com/rancher/opni/pkg/capabilities/wellknown" streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream" "github.com/rancher/opni/plugins/topology/apis/node" "github.com/rancher/opni/plugins/topology/apis/stream" + "google.golang.org/grpc" ) func (p *Plugin) StreamServers() []streamext.Server { @@ -21,3 +23,7 @@ func (p *Plugin) StreamServers() []streamext.Server { }, } } + +func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) { + p.delegate.Set(streamext.NewDelegate(cc, agent.NewClientSet)) +} diff --git a/plugins/topology/pkg/topology/gateway/system.go b/plugins/topology/pkg/topology/gateway/system.go index 0d645571f1..c9cc24f753 100644 --- a/plugins/topology/pkg/topology/gateway/system.go +++ b/plugins/topology/pkg/topology/gateway/system.go @@ -4,7 +4,6 @@ import ( "context" "os" - capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" "github.com/rancher/opni/pkg/config/v1beta1" @@ -94,8 +93,3 @@ func (p *Plugin) UseKeyValueStore(client system.KeyValueStoreClient) { p.nc.Set(nc) <-p.ctx.Done() } - -func (p *Plugin) UseNodeManagerClient(client capabilityv1.NodeManagerClient) { - p.nodeManagerClient.Set(client) - <-p.ctx.Done() -}