Skip to content

Commit

Permalink
Replace node sync service with stream delegates
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky committed Sep 13, 2023
1 parent b4160ce commit e6c86ba
Show file tree
Hide file tree
Showing 32 changed files with 250 additions and 664 deletions.
46 changes: 18 additions & 28 deletions pkg/apis/capability/v1/capability.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions pkg/apis/capability/v1/capability.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package capability;

import "github.com/kralicky/totem/extensions.proto";
import "github.com/rancher/opni/pkg/apis/core/v1/core.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
Expand Down Expand Up @@ -55,10 +54,6 @@ service Node {
}
}

service NodeManager {
rpc RequestSync(SyncRequest) returns (google.protobuf.Empty);
}

message Details {
string name = 1;
string source = 2;
Expand Down
90 changes: 0 additions & 90 deletions pkg/apis/capability/v1/capability_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 1 addition & 10 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

bootstrapv1 "github.com/rancher/opni/pkg/apis/bootstrap/v1"
bootstrapv2 "github.com/rancher/opni/pkg/apis/bootstrap/v2"
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
controlv1 "github.com/rancher/opni/pkg/apis/control/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
streamv1 "github.com/rancher/opni/pkg/apis/stream/v1"
Expand Down Expand Up @@ -66,7 +65,6 @@ type Gateway struct {

storageBackend storage.Backend
capBackendStore capabilities.BackendStore
syncRequester *SyncRequester
}

type GatewayOptions struct {
Expand Down Expand Up @@ -270,10 +268,9 @@ func NewGateway(ctx context.Context, conf *config.GatewayConfig, pl plugins.Load
// set up stream server
listener := health.NewListener()
monitor := health.NewMonitor(health.WithLogger(lg.Named("monitor")))
sync := NewSyncRequester(lg)
delegate := NewDelegateServer(storageBackend, lg)
// set up agent connection handlers
agentHandler := MultiConnectionHandler(listener, sync, delegate)
agentHandler := MultiConnectionHandler(listener, delegate)

go monitor.Run(ctx, listener)
streamSvc := NewStreamServer(agentHandler, storageBackend, httpServer.metricsRegisterer, lg)
Expand Down Expand Up @@ -302,7 +299,6 @@ func NewGateway(ctx context.Context, conf *config.GatewayConfig, pl plugins.Load
httpServer: httpServer,
grpcServer: grpcServer,
statusQuerier: monitor,
syncRequester: sync,
}

waitctx.Go(ctx, func() {
Expand Down Expand Up @@ -367,11 +363,6 @@ func (g *Gateway) CapabilitiesStore() capabilities.BackendStore {
return g.capBackendStore
}

// Implements management.CapabilitiesDataSource
func (g *Gateway) NodeManagerServer() capabilityv1.NodeManagerServer {
return g.syncRequester
}

// Implements management.HealthStatusDataSource
func (g *Gateway) GetClusterHealthStatus(ref *corev1.Reference) (*corev1.HealthStatus, error) {
hs := g.statusQuerier.GetHealthStatus(ref.Id)
Expand Down
119 changes: 0 additions & 119 deletions pkg/gateway/sync.go

This file was deleted.

8 changes: 0 additions & 8 deletions pkg/management/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
managementv1 "github.com/rancher/opni/pkg/apis/management/v1"
"github.com/rancher/opni/pkg/caching"
Expand Down Expand Up @@ -52,7 +51,6 @@ type CoreDataSource interface {
// server needs to serve capabilities-related endpoints
type CapabilitiesDataSource interface {
CapabilitiesStore() capabilities.BackendStore
NodeManagerServer() capabilityv1.NodeManagerServer
}

type HealthStatusDataSource interface {
Expand Down Expand Up @@ -148,15 +146,9 @@ func NewServer(
otelgrpc.UnaryServerInterceptor()),
)
managementv1.RegisterManagementServer(m.grpcServer, m)
if m.capabilitiesDataSource != nil {
capabilityv1.RegisterNodeManagerServer(m.grpcServer, m.capabilitiesDataSource.NodeManagerServer())
}

pluginLoader.Hook(hooks.OnLoadM(func(sp types.SystemPlugin, md meta.PluginMeta) {
go sp.ServeManagementAPI(m)
if m.capabilitiesDataSource != nil {
go sp.ServeNodeManagerServer(m.capabilitiesDataSource.NodeManagerServer())
}
go func() {
if err := sp.ServeAPIExtensions(m.config.GRPCListenAddress); err != nil {
lg.With(
Expand Down
Loading

0 comments on commit e6c86ba

Please sign in to comment.