diff --git a/internal/sinks/provider/http.go b/internal/sinks/provider/http.go index 9571460..f8d939c 100644 --- a/internal/sinks/provider/http.go +++ b/internal/sinks/provider/http.go @@ -2,10 +2,12 @@ package provider import ( "bytes" + "encoding/json" "fmt" "net/http" "time" + "github.com/hashicorp/nomad/api" "github.com/sirupsen/logrus" ) @@ -63,6 +65,15 @@ func NewHTTP(opts HTTPOpts) (*HTTPManager, error) { return httpMgr, nil } +// Prepare takes batches of events and returns JSON encoding of the same. +func (m *HTTPManager) Prepare(events []api.Event) ([]byte, error) { + data, err := json.Marshal(events) + if err != nil { + return nil, err + } + return data, nil +} + // Push sends out events to an HTTP Endpoint. func (m *HTTPManager) Push(data []byte) error { req, err := http.NewRequest("POST", m.rootURL, bytes.NewBuffer(data)) diff --git a/internal/sinks/provider/provider.go b/internal/sinks/provider/provider.go index 5631c1a..c6276d4 100644 --- a/internal/sinks/provider/provider.go +++ b/internal/sinks/provider/provider.go @@ -1,8 +1,12 @@ package provider +import "github.com/hashicorp/nomad/api" + type Provider interface { // Name returns the name of the Provider. Name() string + // Prepare returns a prepared payload as an array of bytes + Prepare([]api.Event) ([]byte, error) // Push pushes a batch of event to upstream. The implementation varies across providers. Push([]byte) error // Ping implements a healthcheck. diff --git a/internal/sinks/worker.go b/internal/sinks/worker.go index 9c336ef..5beae84 100644 --- a/internal/sinks/worker.go +++ b/internal/sinks/worker.go @@ -2,7 +2,6 @@ package sink import ( "context" - "encoding/json" "time" "github.com/hashicorp/nomad/api" @@ -74,13 +73,13 @@ func (w *Worker) flush(batch []api.Event) { return } - data, err := prepareJSON(batch) - if err != nil { - w.log.WithField("batch_len", len(batch)).WithError(err).Error("error while json marshall") - } - w.log.WithField("batch_len", len(batch)).Info("pushing events to providers") for _, prov := range w.providers { + data, err := prov.Prepare(batch) + if err != nil { + w.log.WithField("batch_len", len(batch)).WithError(err).Error("error while json marshall") + } + err = prov.Push(data) if err != nil { // TODO: Handle the error better. @@ -88,12 +87,3 @@ func (w *Worker) flush(batch []api.Event) { } } } - -// prepareJSON takes batches of events and returns JSON encoding of the same. -func prepareJSON(events []api.Event) ([]byte, error) { - data, err := json.Marshal(events) - if err != nil { - return nil, err - } - return data, nil -}