Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC k8s plugin start entrypoint #5400

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes/deployment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ type DeploymentService struct {

// NewDeploymentService creates a new planService.
func NewDeploymentService(
config *config.PipedPlugin,
logger *zap.Logger,
toolClient toolClient,
logPersister logPersister,
) *DeploymentService {
toolRegistry := toolregistry.NewRegistry(toolClient)

return &DeploymentService{
pluginConfig: config,
logger: logger.Named("planner"),
toolRegistry: toolRegistry,
loader: provider.NewLoader(toolRegistry),
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/pipedv1/plugin/kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func main() {
"Plugin component to deploy Kubernetes Application.",
)
app.AddCommands(
NewServerCommand(),
NewPluginCommand(),
)
if err := app.Run(); err != nil {
log.Fatal(err)
Expand Down
76 changes: 55 additions & 21 deletions pkg/app/pipedv1/plugin/kubernetes/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,48 @@ package main

import (
"context"
"net"
"strconv"
"net/http"
"net/http/pprof"
"time"

"github.com/pipe-cd/pipecd/pkg/admin"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/deployment"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/toolregistry"
"github.com/pipe-cd/pipecd/pkg/cli"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedapi"
"github.com/pipe-cd/pipecd/pkg/rpc"
"github.com/pipe-cd/pipecd/pkg/version"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type server struct {
apiPort int
pipedPluginServicePort int
gracePeriod time.Duration
tls bool
certFile string
keyFile string
type plugin struct {
pipedPluginService string
gracePeriod time.Duration
tls bool
certFile string
keyFile string
config string

enableGRPCReflection bool
}

// NewServerCommand creates a new cobra command for executing api server.
func NewServerCommand() *cobra.Command {
s := &server{
apiPort: 10000,
// NewPluginCommand creates a new cobra command for executing api server.
func NewPluginCommand() *cobra.Command {
s := &plugin{
gracePeriod: 30 * time.Second,
}
cmd := &cobra.Command{
Use: "server",
Short: "Start running server.",
Use: "start",
Short: "Start running the kubernetes-plugin.",
RunE: cli.WithContext(s.run),
}

cmd.Flags().IntVar(&s.apiPort, "api-port", s.apiPort, "The port number used to run a grpc server for external apis.")
cmd.Flags().IntVar(&s.pipedPluginServicePort, "piped-plugin-service-port", s.pipedPluginServicePort, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead.
cmd.MarkFlagRequired("piped-plugin-service-port")
cmd.Flags().StringVar(&s.pipedPluginService, "piped-plugin-service", s.pipedPluginService, "The port number used to connect to the piped plugin service.") // TODO: we should discuss about the name of this flag, or we should use environment variable instead.
cmd.Flags().StringVar(&s.config, "config", s.config, "The configuration for the plugin.")
cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.")

cmd.Flags().BoolVar(&s.tls, "tls", s.tls, "Whether running the gRPC server with TLS or not.")
Expand All @@ -66,32 +67,65 @@ func NewServerCommand() *cobra.Command {
// For debugging early in development
cmd.Flags().BoolVar(&s.enableGRPCReflection, "enable-grpc-reflection", s.enableGRPCReflection, "Whether to enable the reflection service or not.")

cmd.MarkFlagRequired("piped-plugin-service")
cmd.MarkFlagRequired("config")

return cmd
}

func (s *server) run(ctx context.Context, input cli.Input) (runErr error) {
func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) {
// Make a cancellable context.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

group, ctx := errgroup.WithContext(ctx)

pipedapiClient, err := pipedapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(s.pipedPluginServicePort)), nil)
pipedapiClient, err := pipedapi.NewClient(ctx, s.pipedPluginService, nil)
if err != nil {
input.Logger.Error("failed to create piped plugin service client", zap.Error(err))
return err
}

// Load the configuration.
cfg, err := config.ParsePluginConfig(s.config)
if err != nil {
input.Logger.Error("failed to parse the configuration", zap.Error(err))
return err
}

// Start running admin server.
{
var (
ver = []byte(version.Get().Version) // TODO: get the plugin's version
admin = admin.NewAdmin(0, s.gracePeriod, input.Logger) // TODO: add config for admin port
)

admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Write(ver)
})
admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
admin.HandleFunc("/debug/pprof/", pprof.Index)
admin.HandleFunc("/debug/pprof/profile", pprof.Profile)
admin.HandleFunc("/debug/pprof/trace", pprof.Trace)

group.Go(func() error {
return admin.Run(ctx)
})
}

// Start a gRPC server for handling external API requests.
{
var (
service = deployment.NewDeploymentService(
cfg,
input.Logger,
toolregistry.NewToolRegistry(pipedapiClient),
logpersister.NewPersister(pipedapiClient, input.Logger),
)
opts = []rpc.Option{
rpc.WithPort(s.apiPort),
rpc.WithPort(cfg.Port),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
Expand Down
12 changes: 12 additions & 0 deletions pkg/configv1/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,18 @@ type PipedDeployTarget struct {
Config json.RawMessage `json:"config"`
}

// ParsePluginConfig parses the given JSON string and returns the PipedPlugin.
func ParsePluginConfig(s string) (*PipedPlugin, error) {
p := &PipedPlugin{}
if err := json.Unmarshal([]byte(s), p); err != nil {
return nil, err
}
if err := p.Validate(); err != nil {
return nil, err
}
return p, nil
}

func (p *PipedPlugin) Validate() error {
if p.Name == "" {
return errors.New("name must be set")
Expand Down
Loading