From 2b2e29cf403482c0ee1402197070a0a1a6f406f9 Mon Sep 17 00:00:00 2001 From: Patryk Strusiewicz-Surmacki Date: Tue, 4 Jun 2024 12:51:43 +0200 Subject: [PATCH] Added separate gRPC client Signed-off-by: Patryk Strusiewicz-Surmacki --- cmd/agent/main.go | 13 +++++---- cmd/manager/main.go | 24 +++++++++-------- pkg/clients/grpc/client.go | 54 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 18 deletions(-) create mode 100644 pkg/clients/grpc/client.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 73ac35ec..1949f079 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -36,26 +36,25 @@ func main() { log.Info("agent's port", "port", port) - var err error - anycastTracker := anycast.NewTracker(&nl.Toolkit{}) + var err error var adapter agent.Adapter switch agentType { - case "netconf": + case "vrf-igbp": + adapter, err = vrfigbpadapter.New(anycastTracker, log) + default: log.Error(fmt.Errorf("agent is currently not supported"), "type", agentType) os.Exit(1) - default: - adapter, err = vrfigbpadapter.New(anycastTracker, log) } - log.Info("created adapter", "type", agentType) - if err != nil { log.Error(err, "error creating adapter") os.Exit(1) } + log.Info("created adapter", "type", agentType) + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { log.Error(err, "error on listening start") diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 365f06d9..74d4aafd 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -23,7 +23,6 @@ import ( "fmt" "os" "sort" - "strconv" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -32,10 +31,10 @@ import ( "github.com/telekom/das-schiff-network-operator/api/v1alpha1" "github.com/telekom/das-schiff-network-operator/controllers" - vrfigbpadapter "github.com/telekom/das-schiff-network-operator/pkg/adapters/vrf_igbp" "github.com/telekom/das-schiff-network-operator/pkg/agent" "github.com/telekom/das-schiff-network-operator/pkg/anycast" "github.com/telekom/das-schiff-network-operator/pkg/bpf" + grpcclient "github.com/telekom/das-schiff-network-operator/pkg/clients/grpc" "github.com/telekom/das-schiff-network-operator/pkg/config" "github.com/telekom/das-schiff-network-operator/pkg/healthcheck" "github.com/telekom/das-schiff-network-operator/pkg/macvlan" @@ -98,6 +97,7 @@ func main() { var nodeConfigPath string var agentType string var agentPort int + var agentAddr string flag.StringVar(&configFile, "config", "", "The controller will load its initial configuration from this file. "+ "Omit this flag to use the default configuration values. "+ @@ -109,8 +109,8 @@ func main() { flag.StringVar(&nodeConfigPath, "nodeconfig-path", reconciler.DefaultNodeConfigPath, "Path to store working node configuration.") flag.StringVar(&agentType, "agent", "vrf-igbp", "Use selected agent type (default: vrf-igbp).") - flag.IntVar(&agentPort, "agentPort", agent.DefaultPort, - "gRPC agent port. (default: "+strconv.Itoa(agent.DefaultPort)+")") + flag.StringVar(&agentAddr, "agentAddr", "", "Agent's address (default: '').") + flag.IntVar(&agentPort, "agentPort", agent.DefaultPort, fmt.Sprintf("Agent's port (default: %d).", agent.DefaultPort)) opts := zap.Options{ Development: true, } @@ -139,14 +139,16 @@ func main() { var agentClient agent.Client switch agentType { - case "netconf": - setupLog.Error(fmt.Errorf("netconf agent is currently not supported"), "unsupported error") - os.Exit(1) + case "vrf-igbp": + agentClient, err = grpcclient.NewClient(fmt.Sprintf("%s:%d", agentAddr, agentPort)) default: - agentClient, err = vrfigbpadapter.NewClient(fmt.Sprintf(":%d", agentPort)) - if err != nil { - setupLog.Error(err, "error creating agent's client") - } + setupLog.Error(fmt.Errorf("agent %s is currently not supported", agentType), "unsupported error") + os.Exit(1) + } + + if err != nil { + setupLog.Error(err, "error creating agent's client") + os.Exit(1) } if err := start(&options, onlyBPFMode, nodeConfigPath, interfacePrefix, agentClient); err != nil { diff --git a/pkg/clients/grpc/client.go b/pkg/clients/grpc/client.go new file mode 100644 index 00000000..e24e8003 --- /dev/null +++ b/pkg/clients/grpc/client.go @@ -0,0 +1,54 @@ +package clients + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/telekom/das-schiff-network-operator/api/v1alpha1" + "github.com/telekom/das-schiff-network-operator/pkg/agent" + agentpb "github.com/telekom/das-schiff-network-operator/pkg/agent/pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const defaultTimeout = 30 * time.Second + +type Client struct { + agentpb.AgentClient +} + +func NewClient(address string) (agent.Client, error) { + var grpcOpts []grpc.DialOption + grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(address, grpcOpts...) + if err != nil { + return nil, fmt.Errorf("unable to create gRPC connection: %w", err) + } + + vrfigbpClient := Client{agentpb.NewAgentClient(conn)} + + return &vrfigbpClient, nil +} + +func (c *Client) SendConfig(ctx context.Context, nodeConfig *v1alpha1.NodeConfig) error { + timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + nc := agentpb.NetworkConfiguration{ + Data: []byte{}, + } + data, err := json.Marshal(*nodeConfig) + if err != nil { + return fmt.Errorf("error marshaling NodeConfig: %w", err) + } + + nc.Data = data + + if _, err = c.SetConfiguration(timeoutCtx, &nc); err != nil { + return fmt.Errorf("error setting configuration: %w", err) + } + + return nil +}