Skip to content

Commit

Permalink
Merge branch 'bugfix/issue-138'
Browse files Browse the repository at this point in the history
Closes: #138

Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
  • Loading branch information
panchoh and nilp0inter committed Dec 15, 2020
2 parents 847c1bd + 4546fc6 commit dbd1a6a
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 70 deletions.
22 changes: 2 additions & 20 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/spf13/cobra"

"github.com/BBVA/kapow/internal/logger"
"github.com/BBVA/kapow/internal/server"
)

Expand All @@ -47,7 +46,7 @@ var ServerCmd = &cobra.Command{

sConf.ClientAuth, _ = cmd.Flags().GetBool("clientauth")
sConf.ClientCaFile, _ = cmd.Flags().GetString("clientcafile")
debug, _ := cmd.Flags().GetBool("debug")
sConf.Debug, _ = cmd.Flags().GetBool("debug")

// Set environment variables KAPOW_DATA_URL and KAPOW_CONTROL_URL only if they aren't set so we don't overwrite user's preferences
if _, exist := os.LookupEnv("KAPOW_DATA_URL"); !exist {
Expand All @@ -57,10 +56,6 @@ var ServerCmd = &cobra.Command{
os.Setenv("KAPOW_CONTROL_URL", "http://"+sConf.ControlBindAddr)
}

if debug {
logger.RegisterLogger(logger.SCRIPTS, nil)
}

server.StartServer(sConf)

if len(args) > 0 {
Expand All @@ -83,11 +78,7 @@ var ServerCmd = &cobra.Command{
log.Printf("Done running powfile: %q\n", powfile)
}

if debug {
processLogs()
} else {
select {}
}
select {}
},
}

Expand Down Expand Up @@ -123,12 +114,3 @@ func validateServerCommandArguments(cmd *cobra.Command, args []string) error {

return nil
}

func processLogs() {

for {
if !logger.ProcessMsg(logger.SCRIPTS) {
break
}
}
}
2 changes: 2 additions & 0 deletions internal/server/model/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ type Route struct {
// Index is this route position in the server's routes list.
// It is an output field, its value is ignored as input.
Index int `json:"index"`

Debug bool `json:"debug"`
}
5 changes: 3 additions & 2 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type ServerConfig struct {
CertFile,
ClientCaFile string

ClientAuth bool
ClientAuth,
Debug bool
}

// StartServer Starts one instance of each server in a goroutine and remains listening on a channel for trace events generated by them
Expand All @@ -41,7 +42,7 @@ func StartServer(config ServerConfig) {
wg.Add(3)
go control.Run(config.ControlBindAddr, &wg)
go data.Run(config.DataBindAddr, &wg)
go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth)
go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth, config.Debug)

// Wait for servers signals in order to return
wg.Wait()
Expand Down
49 changes: 32 additions & 17 deletions internal/server/user/mux/handlerbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ package mux

import (
"bufio"
"bytes"
"log"
"net/http"
"io"
"os"

"github.com/google/uuid"

"github.com/BBVA/kapow/internal/logger"
"github.com/BBVA/kapow/internal/server/data"
"github.com/BBVA/kapow/internal/server/model"
"github.com/BBVA/kapow/internal/server/user/spawn"
)

var spawner = spawn.Spawn
var idGenerator = uuid.NewUUID
var logHandler io.Writer = os.Stdout

func handlerBuilder(route model.Route) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -51,29 +52,43 @@ func handlerBuilder(route model.Route) http.Handler {
data.Handlers.Add(h)
defer data.Handlers.Remove(h.ID)

stdOut := &bytes.Buffer{}
stdErr := &bytes.Buffer{}
err = spawner(h, stdOut, stdErr)
//err = spawner(h, nil)
if route.Debug {
var stdOutR, stdOutW *os.File
stdOutR, stdOutW, err = os.Pipe()
defer stdOutW.Close()
if err != nil {
log.Println(err)
return
}
var stdErrR, stdErrW *os.File
stdErrR, stdErrW, err = os.Pipe()
defer stdErrW.Close()
if err != nil {
log.Println(err)
return
}

go logStream(h.ID, "stdout", stdOutR)
go logStream(h.ID, "stderr", stdErrR)

err = spawner(h, stdOutW, stdErrW)
} else {
err = spawner(h, nil, nil)
}


if err != nil {
log.Println(err)
}

logger.SendMsg(logger.SCRIPTS, createLogMsg(h.ID, *stdOut, *stdErr))
})
}

func createLogMsg(handlerId string, stdout, stderr bytes.Buffer) logger.LogMsg {
var messages []string
scanner := bufio.NewScanner(bytes.NewBuffer(stdout.Bytes()))
func logStream(handlerId string, streamName string, stream *os.File) {
defer stream.Close()
execLog := log.New(logHandler, "", log.Ldate|log.Ltime|log.LUTC|log.Lmicroseconds)
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
messages = append(messages, scanner.Text())
execLog.Printf("%s %s: %s", handlerId, streamName, scanner.Text())
}
scanner = bufio.NewScanner(bytes.NewBuffer(stderr.Bytes()))
for scanner.Scan() {
messages = append(messages, scanner.Text())
}

return logger.LogMsg{Prefix: handlerId, Messages: messages}
}
76 changes: 46 additions & 30 deletions internal/server/user/mux/handlerbuilder_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !race

/*
* Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
*
Expand Down Expand Up @@ -25,6 +27,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/google/uuid"

Expand Down Expand Up @@ -199,50 +202,63 @@ func TestHandlerBuilderRemovesHandlerWhenDone(t *testing.T) {
}
}

func TestCreateLogMsgAdsPrefixInfo(t *testing.T) {
expected := "FOO"
func TestHandlerBuilderLogToLogHandlerWhenDebugIsEnabled(t *testing.T) {
data.Handlers = data.New()
route := model.Route{Debug: true}
var got string

logHandler = new(bytes.Buffer)

msg := createLogMsg(expected, bytes.Buffer{}, bytes.Buffer{})
spawner = func(h *model.Handler, out io.Writer, er io.Writer) error {
_, _ = out.Write([]byte("this is stdout"))
_, _ = er.Write([]byte("this is stderr"))

if msg.Prefix != expected {
t.Errorf("LogMsg doesn't contain expected Prefix. Expected: %s, got: %s", expected, msg.Prefix)
return nil
}
}

func TestCreateLogMsgAdsStdOutInfo(t *testing.T) {
expected := "FOO\nBAR"
out := bytes.Buffer{}
out.WriteString(expected)
handlerBuilder(route).ServeHTTP(nil, nil)

msg := createLogMsg("", out, bytes.Buffer{})
// NOTE: logStream will write stdout and stderr contents eventually.
// We do not have any control the goroutines running logStream, thus we
// cannot use a synchronization primitive to wait for them. Sorry.
time.Sleep(1 * time.Second)

if strings.Join(msg.Messages, "\n") != expected {
t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix)
got = logHandler.(*bytes.Buffer).String()
if ! strings.Contains(got, "this is stdout") {
t.Errorf("Stdout not preserved. Actual: %+q", got)
}
if ! strings.Contains(got, "this is stderr") {
t.Errorf("Stderr not preserved. Actual: %+q", got)
}
}

func TestCreateLogMsgAdsStdErrInfo(t *testing.T) {
expected := "FOO\nBAR"
err := bytes.Buffer{}
err.WriteString(expected)

msg := createLogMsg("", bytes.Buffer{}, err)
func TestHandlerBuilderDoesNotLogToLogHandlerWhenDebugIsDisabled(t *testing.T) {
data.Handlers = data.New()
route := model.Route{Debug: false}

logHandler = new(bytes.Buffer)

spawner = func(h *model.Handler, out io.Writer, er io.Writer) error {
if out != nil {
_, _ = out.Write([]byte("this is stdout"))
}
if er != nil {
_, _ = er.Write([]byte("this is stderr"))
}

if strings.Join(msg.Messages, "\n") != expected {
t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix)
return nil
}
}

func TestCreateLogMsgAdsStdOutAndStdErrInfo(t *testing.T) {
expected := "FOO\nBAR\nFOO BAZ"
out := bytes.Buffer{}
out.WriteString("FOO\nBAR\n")
err := bytes.Buffer{}
err.WriteString("FOO BAZ")
handlerBuilder(route).ServeHTTP(nil, nil)

msg := createLogMsg("", out, err)
// NOTE: logStream will write stdout and stderr contents eventually.
// We do not have any control the goroutines running logStream, thus we
// cannot use a synchronization primitive to wait for them. Sorry.
time.Sleep(1 * time.Second)

if strings.Join(msg.Messages, "\n") != expected {
t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix)
size := logHandler.(*bytes.Buffer).Len()
if size != 0 {
t.Error("Something was logged to stderr with debug=false")
}
}
2 changes: 2 additions & 0 deletions internal/server/user/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/BBVA/kapow/internal/server/model"
"github.com/BBVA/kapow/internal/server/user/spawn"
"github.com/gorilla/mux"
)

Expand Down Expand Up @@ -227,6 +228,7 @@ func TestServeHTTPCallsInnerMuxAfterAcquiringLock(t *testing.T) {
}

func TestUpdateUpdatesMuxWithProvideRouteList(t *testing.T) {
spawner = spawn.Spawn
sm := New()
rs := []model.Route{
{
Expand Down
9 changes: 8 additions & 1 deletion internal/server/user/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,20 @@ var Server = http.Server{
Handler: mux.New(),
}

var DebugEndpoints bool

// Run finishes configuring Server and runs ListenAndServe on it
func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth bool) {
func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth, debug bool) {

Server = http.Server{
Addr: bindAddr,
Handler: mux.New(),
}

if debug {
Routes.SetDebug()
}

listener, err := net.Listen("tcp", bindAddr)
if err != nil {
log.Fatal(err)
Expand Down
6 changes: 6 additions & 0 deletions internal/server/user/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type safeRouteList struct {
rs []model.Route
m *sync.RWMutex
globalDebug bool
}

var Routes safeRouteList = New()
Expand All @@ -38,9 +39,14 @@ func New() safeRouteList {
}
}

func (srl *safeRouteList) SetDebug() {
srl.globalDebug = true
}

func (srl *safeRouteList) Append(r model.Route) model.Route {
srl.m.Lock()
r.Index = len(srl.rs)
r.Debug = srl.globalDebug || r.Debug
srl.rs = append(srl.rs, r)
srl.m.Unlock()

Expand Down

0 comments on commit dbd1a6a

Please sign in to comment.