Skip to content

Commit

Permalink
More refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mumoshu committed Oct 29, 2020
1 parent 56fd57e commit 34f17a7
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 96 deletions.
56 changes: 8 additions & 48 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package variant

import (
"bytes"
"fmt"
"io"
"os"

"github.com/mumoshu/variant2/pkg/controller"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"io"
)

func RunMain(env Env, opts ...Option) error {
Expand All @@ -27,50 +22,15 @@ func RunMain(env Env, opts ...Option) error {

if controller.RunRequested() {
return controller.Run(func(args []string) (string, error) {
buf := &bytes.Buffer{}

outRead, outWrite := io.Pipe()
outBuf := io.TeeReader(outRead, buf)

errRead, errWrite := io.Pipe()
errBuf := io.TeeReader(errRead, buf)

eg := &errgroup.Group{}

eg.Go(func() error {
if _, err := io.Copy(os.Stdout, outBuf); err != nil {
return xerrors.Errorf("copying to stdout: %w", err)
}

return nil
out, err := controller.CaptureOutput(func(stdout, stderr io.Writer) error {
return m.Run(args, RunOptions{
Stdout: stdout,
Stderr: stdout,
DisableLocking: false,
})
})

eg.Go(func() error {
if _, err := io.Copy(os.Stderr, errBuf); err != nil {
return xerrors.Errorf("copying to stderr: %w", err)
}

return nil
})

err := m.Run(args, RunOptions{
Stdout: outWrite,
Stderr: errWrite,
DisableLocking: false,
})

outWrite.Close()
errWrite.Close()

if egErr := eg.Wait(); egErr != nil {
panic(egErr)
}

if err != nil {
return "", xerrors.Errorf("running command: %w", err)
}

return buf.String(), err
return out, err
})
}

Expand Down
52 changes: 52 additions & 0 deletions pkg/controller/capture.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package controller

import (
"bytes"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"io"
"os"
)

func CaptureOutput(f func(io.Writer, io.Writer) error) (string, error) {
buf := &bytes.Buffer{}

outRead, outWrite := io.Pipe()
outBuf := io.TeeReader(outRead, buf)

errRead, errWrite := io.Pipe()
errBuf := io.TeeReader(errRead, buf)

eg := &errgroup.Group{}

eg.Go(func() error {
if _, err := io.Copy(os.Stdout, outBuf); err != nil {
return xerrors.Errorf("copying to stdout: %w", err)
}

return nil
})

eg.Go(func() error {
if _, err := io.Copy(os.Stderr, errBuf); err != nil {
return xerrors.Errorf("copying to stderr: %w", err)
}

return nil
})

err := f(outWrite, errWrite)

outWrite.Close()
errWrite.Close()

if egErr := eg.Wait(); egErr != nil {
panic(egErr)
}

if err != nil {
return "", xerrors.Errorf("running command: %w", err)
}

return buf.String(), nil
}
67 changes: 67 additions & 0 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package controller

import (
"fmt"
"os"
"strings"
)

type Config struct {
controllerName string
resyncPeriod string
forKind string
group string
version string
jobOnApply string
jobOnDestroy string
}

func getConfigFromEnv() (*Config, error) {
getEnv := func(n string) (string, string) {
name := EnvPrefix + n
value := os.Getenv(name)

return name, value
}

controllerNameEnv, controllerName := getEnv("NAME")
if controllerName == "" {
return nil, fmt.Errorf("missing required environment variable: %s", controllerNameEnv)
}

_, forAPIVersion := getEnv("FOR_API_VERSION")
if forAPIVersion == "" {
forAPIVersion = coreGroup + "/" + coreVersion
}

_, forKind := getEnv("FOR_KIND")
if forKind == "" {
forKind = "Resource"
}

_, resyncPeriod := getEnv("RESYNC_PERIOD")

groupVersion := strings.Split(forAPIVersion, "/")
group := groupVersion[0]
version := groupVersion[1]

jobOnApplyEnv, jobOnApply := getEnv("JOB_ON_APPLY")
if jobOnApply == "" {
return nil, fmt.Errorf("missing required environment variable: %s", jobOnApplyEnv)
}

jobOnDestroyEnv, jobOnDestroy := getEnv("JOB_ON_DESTROY")
if jobOnDestroy == "" {
return nil, fmt.Errorf("missing required environment variable: %s", jobOnDestroyEnv)
}

return &Config{
controllerName: controllerName,
resyncPeriod: resyncPeriod,
forKind: forKind,
group: group,
version: version,
jobOnApply: jobOnApply,
jobOnDestroy: jobOnDestroy,
}, nil
}
62 changes: 14 additions & 48 deletions pkg/controller/run.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"fmt"
"os"
"strings"

Expand Down Expand Up @@ -48,42 +47,9 @@ func Run(run func([]string) (string, error)) (finalErr error) {
return xerrors.Errorf("getting kubernetes client config: %w", err)
}

getEnv := func(n string) (string, string) {
name := EnvPrefix + n
value := os.Getenv(name)

return name, value
}

controllerNameEnv, controllerName := getEnv("NAME")
if controllerName == "" {
return fmt.Errorf("missing required environment variable: %s", controllerNameEnv)
}

_, forAPIVersion := getEnv("FOR_API_VERSION")
if forAPIVersion == "" {
forAPIVersion = coreGroup + "/" + coreVersion
}

_, forKind := getEnv("FOR_KIND")
if forKind == "" {
forKind = "Resource"
}

_, resyncPeriod := getEnv("RESYNC_PERIOD")

groupVersion := strings.Split(forAPIVersion, "/")
group := groupVersion[0]
version := groupVersion[1]

jobOnApplyEnv, jobOnApply := getEnv("JOB_ON_APPLY")
if jobOnApply == "" {
return fmt.Errorf("missing required environment variable: %s", jobOnApplyEnv)
}

jobOnDestroyEnv, jobOnDestroy := getEnv("JOB_ON_DESTROY")
if jobOnDestroy == "" {
return fmt.Errorf("missing required environment variable: %s", jobOnDestroyEnv)
conf, err := getConfigFromEnv()
if err != nil {
return xerrors.Errorf("getting config from envvars: %w", err)
}

podName, err := os.Hostname()
Expand All @@ -92,33 +58,33 @@ func Run(run func([]string) (string, error)) (finalErr error) {
}

ctl := &controller{
log: logf.Log.WithName(controllerName),
log: logf.Log.WithName(conf.controllerName),
runtimeClient: nil,
run: run,
podName: podName,
controllerName: controllerName,
controllerName: conf.controllerName,
}

handle := func(st *state.State, job string) (finalErr error) {
return ctl.do(job, st.Object)
}

applyHandler := StateHandlerFunc(func(st *state.State) error {
return handle(st, jobOnApply)
return handle(st, conf.jobOnApply)
})

destroyHandler := StateHandlerFunc(func(st *state.State) error {
return handle(st, jobOnDestroy)
return handle(st, conf.jobOnDestroy)
})

c := &config.Config{
Name: controllerName,
whiteboxConfig := &config.Config{
Name: conf.controllerName,
Resources: []*config.ResourceConfig{
{
GroupVersionKind: schema.GroupVersionKind{
Group: group,
Version: version,
Kind: forKind,
Group: conf.group,
Version: conf.version,
Kind: conf.forKind,
},
Reconciler: &config.ReconcilerConfig{
HandlerConfig: config.HandlerConfig{
Expand All @@ -128,13 +94,13 @@ func Run(run func([]string) (string, error)) (finalErr error) {
Finalizer: &config.HandlerConfig{
StateHandler: destroyHandler,
},
ResyncPeriod: resyncPeriod,
ResyncPeriod: conf.resyncPeriod,
},
},
Webhook: nil,
}

mgr, err := manager.New(c, kc)
mgr, err := manager.New(whiteboxConfig, kc)
if err != nil {
return xerrors.Errorf("creating controller-manager: %w", err)
}
Expand Down

0 comments on commit 34f17a7

Please sign in to comment.