diff --git a/client.go b/client.go index d29c8b8..ee28335 100644 --- a/client.go +++ b/client.go @@ -73,6 +73,9 @@ var ( ErrGRPCBrokerMuxNotSupported = errors.New("client requested gRPC broker multiplexing but plugin does not support the feature") ) +// defaultPluginLogBufferSize is the default size of the buffer used to read from stderr for plugin log lines. +const defaultPluginLogBufferSize = 64 * 1024 + // Client handles the lifecycle of a plugin application. It launches // plugins, connects to them, dispenses interface implementations, and handles // killing the process. @@ -220,6 +223,10 @@ type ClientConfig struct { // it will default to hclog's default logger. Logger hclog.Logger + // PluginLogBufferSize is the buffer size(bytes) to read from stderr for plugin log lines. + // If this is 0, then the default of 64KB is used. + PluginLogBufferSize int + // AutoMTLS has the client and server automatically negotiate mTLS for // transport authentication. This ensures that only the original client will // be allowed to connect to the server, and all other connections will be @@ -416,6 +423,10 @@ func NewClient(config *ClientConfig) (c *Client) { }) } + if config.PluginLogBufferSize == 0 { + config.PluginLogBufferSize = defaultPluginLogBufferSize + } + c = &Client{ config: config, logger: config.Logger, @@ -1146,14 +1157,12 @@ func (c *Client) getGRPCMuxer(addr net.Addr) (*grpcmux.GRPCClientMuxer, error) { return c.grpcMuxer, nil } -var stdErrBufferSize = 64 * 1024 - func (c *Client) logStderr(name string, r io.Reader) { defer c.clientWaitGroup.Done() defer c.stderrWaitGroup.Done() l := c.logger.Named(filepath.Base(name)) - reader := bufio.NewReaderSize(r, stdErrBufferSize) + reader := bufio.NewReaderSize(r, c.config.PluginLogBufferSize) // continuation indicates the previous line was a prefix continuation := false // panic indicates the previous line was the start of a panic output diff --git a/client_test.go b/client_test.go index 11a4b0d..a17bf10 100644 --- a/client_test.go +++ b/client_test.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "crypto/sha256" + "encoding/json" "errors" "fmt" "io" @@ -1483,18 +1484,13 @@ func testClient_logger(t *testing.T, proto string) { // Test that we continue to consume stderr over long lines. func TestClient_logStderr(t *testing.T) { - orig := stdErrBufferSize - stdErrBufferSize = 32 - defer func() { - stdErrBufferSize = orig - }() - stderr := bytes.Buffer{} c := NewClient(&ClientConfig{ Stderr: &stderr, Cmd: &exec.Cmd{ Path: "test", }, + PluginLogBufferSize: 32, }) c.clientWaitGroup.Add(1) @@ -1515,3 +1511,55 @@ this line is short t.Fatalf("\nexpected output: %q\ngot output: %q", msg, read) } } + +func TestClient_logStderrParseJSON(t *testing.T) { + logBuf := bytes.Buffer{} + c := NewClient(&ClientConfig{ + Stderr: bytes.NewBuffer(nil), + Cmd: &exec.Cmd{Path: "test"}, + PluginLogBufferSize: 64, + Logger: hclog.New(&hclog.LoggerOptions{ + Name: "test-logger", + Level: hclog.Trace, + Output: &logBuf, + JSONFormat: true, + }), + }) + c.clientWaitGroup.Add(1) + + msg := `{"@message": "this is a message", "@level": "info"} +{"@message": "this is a large message that is more than 64 bytes long", "@level": "info"}` + reader := strings.NewReader(msg) + + c.stderrWaitGroup.Add(1) + c.logStderr(c.config.Cmd.Path, reader) + logs := strings.Split(strings.TrimSpace(logBuf.String()), "\n") + + wants := []struct { + wantLevel string + wantMessage string + }{ + {"info", "this is a message"}, + {"debug", `{"@message": "this is a large message that is more than 64 bytes`}, + {"debug", ` long", "@level": "info"}`}, + } + + if len(logs) != len(wants) { + t.Fatalf("expected %d logs, got %d", len(wants), len(logs)) + } + + for i, tt := range wants { + l := make(map[string]interface{}) + if err := json.Unmarshal([]byte(logs[i]), &l); err != nil { + t.Fatal(err) + } + + if l["@level"] != tt.wantLevel { + t.Fatalf("expected level %q, got %q", tt.wantLevel, l["@level"]) + } + + if l["@message"] != tt.wantMessage { + t.Fatalf("expected message %q, got %q", tt.wantMessage, l["@message"]) + } + } +}