Skip to content

Commit

Permalink
feat(CLI): adds force-sink flag (#84)
Browse files Browse the repository at this point in the history
* feat(CLI): adds force-stdout flag

Adds a flag to mutate the Sink configuration
to always be sent to standard output.

* chore: add .DS_Store to gitignore

* fixup! feat(CLI): adds force-stdout flag

* style: GetConfig -> Config
  • Loading branch information
shellcromancer committed Mar 8, 2023
1 parent c809f65 commit cb7e697
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ playground.wasm.gz
wasm_exec.js
build/playground/substation_logo.png

# Ignore macOS system files
.DS_Store

# Archive files
*.zip

Expand Down
11 changes: 11 additions & 0 deletions cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package cmd

import (
"bytes"
"context"
"encoding/json"
"io"
Expand Down Expand Up @@ -81,6 +82,16 @@ func (sub *substation) SetConfig(r io.Reader) error {
return nil
}

// Config retreives the configuration of the app.
func (sub *substation) Config() (io.Reader, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(sub.config); err != nil {
return nil, err
}

return &buf, nil
}

// Concurrency returns the concurrency setting of the app.
func (sub *substation) Concurrency() int {
return sub.concurrency
Expand Down
71 changes: 63 additions & 8 deletions cmd/development/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,31 @@ import (
"fmt"
"io"
"os"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/bufio"
"github.com/brexhq/substation/internal/file"
"golang.org/x/sync/errgroup"
"github.com/brexhq/substation/internal/json"
)

type metadata struct {
Name string `json:"name"`
Size int64 `json:"size"`
}

type options struct {
Input string
Config string

ForceSink string
}

// getConfig contextually retrieves a Substation configuration.
func getConfig(ctx context.Context, cfg string) (io.Reader, error) {
path, err := file.Get(ctx, cfg)
Expand All @@ -46,24 +56,27 @@ func getConfig(ctx context.Context, cfg string) (io.Reader, error) {
}

func main() {
input := flag.String("input", "", "file to parse")
config := flag.String("config", "", "Substation configuration file")
var opts options

timeout := flag.Duration("timeout", 10*time.Second, "timeout")
flag.StringVar(&opts.Input, "input", "", "file to parse")
flag.StringVar(&opts.Config, "config", "", "Substation configuration file")
flag.StringVar(&opts.ForceSink, "force-sink", "", "force sink output to value (supported: stdout)")
flag.Parse()

ctx, cancel := context.WithTimeout(context.Background(), *timeout)
defer cancel()

if err := run(ctx, *input, *config); err != nil {
if err := run(ctx, opts); err != nil {
panic(fmt.Errorf("main: %v", err))
}
}

func run(ctx context.Context, input, cfg string) error {
func run(ctx context.Context, opts options) error {
sub := cmd.New()

// load configuration file
c, err := getConfig(ctx, cfg)
c, err := getConfig(ctx, opts.Config)
if err != nil {
return fmt.Errorf("run: %v", err)
}
Expand All @@ -72,6 +85,22 @@ func run(ctx context.Context, input, cfg string) error {
return fmt.Errorf("run: %v", err)
}

if opts.ForceSink != "" {
c, err = sub.Config()
if err != nil {
return fmt.Errorf("run: %v", err)
}

newConfig, err := mutateSink(c, opts.ForceSink)
if err != nil {
return fmt.Errorf("run: %v", err)
}

if err := sub.SetConfig(newConfig); err != nil {
return fmt.Errorf("run: %v", err)
}
}

group, ctx := errgroup.WithContext(ctx)

var sinkWg sync.WaitGroup
Expand All @@ -90,7 +119,7 @@ func run(ctx context.Context, input, cfg string) error {

// ingest
group.Go(func() error {
fi, err := file.Get(ctx, input)
fi, err := file.Get(ctx, opts.Input)
if err != nil {
return err
}
Expand All @@ -109,7 +138,7 @@ func run(ctx context.Context, input, cfg string) error {

capsule := config.NewCapsule()
if _, err = capsule.SetMetadata(metadata{
input,
opts.Input,
fs.Size(),
}); err != nil {
return fmt.Errorf("run: %v", err)
Expand Down Expand Up @@ -150,3 +179,29 @@ func run(ctx context.Context, input, cfg string) error {

return nil
}

func mutateSink(cfg io.Reader, forceSink string) (*bytes.Reader, error) {
oldConfig, err := io.ReadAll(cfg)
if err != nil {
return nil, fmt.Errorf("run: %v", err)
}

var r *bytes.Reader

switch {
case forceSink == "stdout":
newConfig, err := json.Set(oldConfig, "sink.type", forceSink)
if err != nil {
return nil, fmt.Errorf("run: %v", err)
}
r = bytes.NewReader(newConfig)
case strings.HasPrefix(forceSink, "http://"):
return nil, fmt.Errorf("-force-sink http://* not yet implemented")
case strings.HasPrefix(forceSink, "s3://"):
return nil, fmt.Errorf("-force-sink s3://* not yet implemented")
default:
return nil, fmt.Errorf("%q not supported for -force-sink", forceSink)
}

return r, nil
}

0 comments on commit cb7e697

Please sign in to comment.