From 628854d2c2862e15876e14760375cd6c099cc5e9 Mon Sep 17 00:00:00 2001 From: Lei Qiu Date: Fri, 10 Apr 2020 15:18:24 -0700 Subject: [PATCH] Cherry-pick #16354 to 7.x: Update filebeat httpjson input to support pagination via Header and Okta module (#17669) * Update filebeat httpjson input to support pagination via Header and Okta module (#16354) * Update filebeat httpjson input to support pagination via Header and Okta module (cherry picked from commit a6a2d49f75fd78df80e4116157fe6e1772f67bf5) * Update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 33 ++- x-pack/filebeat/input/httpjson/config.go | 62 ++++- .../filebeat/input/httpjson/httpjson_test.go | 194 +++++++++++++- x-pack/filebeat/input/httpjson/input.go | 248 ++++++++++++++---- 5 files changed, 464 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ba4820ce695..d688f822eb3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -321,6 +321,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added access_key_id, secret_access_key and session_token into aws module config. {pull}17456[17456] - Improve ECS categorization field mappings for mysql module. {issue}16172[16172] {pull}17491[17491] - Release Google Cloud module as GA. {pull}17511[17511] +- Update filebeat httpjson input to support pagination via Header and Okta module. {pull}16354[16354] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index faafb72e45f..441bcde7f6e 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -91,10 +91,15 @@ Time duration between repeated data retrievals. Default: 0s, meaning no repeated If the HTTP API returns data in a JSON array, then this option can be set to decode these records from the array. Default: not used. +[float] +==== `no_http_body` + +If set, do not use HTTP request body. Default: false. + [float] ==== `pagination.enabled` -This option specifies whether pagination is enabled. Default: false. +This option specifies whether pagination is enabled. Default: true. [float] ==== `pagination.extra_body_content` @@ -102,6 +107,16 @@ This option specifies whether pagination is enabled. Default: false. Any additional data that needs to be set in the HTTP pagination request can be specified in this JSON blob. Default: not used. +[float] +==== `pagination.header.field_name` + +The field name in the HTTP Header that is used for pagination control. + +[float] +==== `pagination.header.regex_pattern` + +The regular expression pattern to use for retrieving the pagination information from the HTTP Header field specified above. + [float] ==== `pagination.id_field` @@ -120,6 +135,22 @@ Required when pagination is enabled. This specifies the URL for sending pagination request. Required if the pagination URL is different than the HTTP API URL. +[float] +==== `rate_limit.limit` + +This specifies the field in the HTTP Header of the response that specifies the total limit. + +[float] +==== `rate_limit.remaining` + +This specifies the field in the HTTP Header of the response that specifies the remaining quota of the rate limit. + +[float] +==== `rate_limit.reset` + +This specifies the field in the HTTP Header of the response that specifies the epoch time +when the rate limit will be reset. + [float] ==== `ssl` diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 6839f6f3b2f..cb1e12ba417 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -5,6 +5,7 @@ package httpjson import ( + "regexp" "strings" "time" @@ -16,27 +17,49 @@ import ( // Config contains information about httpjson configuration type config struct { - APIKey string `config:"api_key"` - HTTPClientTimeout time.Duration `config:"http_client_timeout"` - HTTPHeaders common.MapStr `config:"http_headers"` - HTTPMethod string `config:"http_method" validate:"required"` - HTTPRequestBody common.MapStr `config:"http_request_body"` - Interval time.Duration `config:"interval"` - JSONObjects string `config:"json_objects_array"` - Pagination *Pagination `config:"pagination"` - TLS *tlscommon.Config `config:"ssl"` - URL string `config:"url" validate:"required"` + APIKey string `config:"api_key"` + AuthenticationScheme string `config:"authentication_scheme"` + HTTPClientTimeout time.Duration `config:"http_client_timeout"` + HTTPHeaders common.MapStr `config:"http_headers"` + HTTPMethod string `config:"http_method" validate:"required"` + HTTPRequestBody common.MapStr `config:"http_request_body"` + Interval time.Duration `config:"interval"` + JSONObjects string `config:"json_objects_array"` + NoHTTPBody bool `config:"no_http_body"` + Pagination *Pagination `config:"pagination"` + RateLimit *RateLimit `config:"rate_limit"` + TLS *tlscommon.Config `config:"ssl"` + URL string `config:"url" validate:"required"` } // Pagination contains information about httpjson pagination settings type Pagination struct { - IsEnabled bool `config:"enabled"` + Enabled *bool `config:"enabled"` ExtraBodyContent common.MapStr `config:"extra_body_content"` + Header *Header `config:"header"` IDField string `config:"id_field"` RequestField string `config:"req_field"` URL string `config:"url"` } +// IsEnabled returns true if the `enable` field is set to true in the yaml. +func (p *Pagination) IsEnabled() bool { + return p != nil && (p.Enabled == nil || *p.Enabled) +} + +// HTTP Header information for pagination +type Header struct { + FieldName string `config:"field_name" validate:"required"` + RegexPattern *regexp.Regexp `config:"regex_pattern" validate:"required"` +} + +// HTTP Header Rate Limit information +type RateLimit struct { + Limit string `config:"limit"` + Reset string `config:"reset"` + Remaining string `config:"remaining"` +} + func (c *config) Validate() error { switch strings.ToUpper(c.HTTPMethod) { case "GET": @@ -44,7 +67,22 @@ func (c *config) Validate() error { case "POST": break default: - return errors.Errorf("httpjson input: Invalid http_method, %s - ", c.HTTPMethod) + return errors.Errorf("httpjson input: Invalid http_method, %s", c.HTTPMethod) + } + if c.NoHTTPBody { + if len(c.HTTPRequestBody) > 0 { + return errors.Errorf("invalid configuration: both no_http_body and http_request_body cannot be set simultaneously") + } + if c.Pagination != nil && (len(c.Pagination.ExtraBodyContent) > 0 || c.Pagination.RequestField != "") { + return errors.Errorf("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously") + } + } + if c.Pagination != nil { + if c.Pagination.Header != nil { + if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 { + return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously") + } + } } return nil } diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index afeece9fad0..107cc3778a6 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -11,6 +11,7 @@ import ( "log" "net/http" "net/http/httptest" + "regexp" "sync" "testing" @@ -35,7 +36,7 @@ func testSetup(t *testing.T) { }) } -func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) { +func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) { testSetup(t) // Create an http test server according to whether TLS is used var newServer = httptest.NewServer @@ -90,7 +91,7 @@ func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input if err != nil { t.Fatal(err) } - input := in.(*httpjsonInput) + input := in.(*HttpjsonInput) defer input.Stop() run(input, eventOutlet, t) @@ -152,12 +153,191 @@ func (o *stubOutleter) OnEvent(event beat.Event) bool { // --- Test Cases +func TestConfigValidationCase1(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "http_request_body": map[string]interface{}{"test": "abc"}, + "no_http_body": true, + "url": "localhost", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + if err := cfg.Unpack(&conf); err == nil { + t.Fatal("Configuration validation failed. no_http_body and http_request_body cannot coexist.") + } +} + +func TestConfigValidationCase2(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "no_http_body": true, + "pagination": map[string]interface{}{"extra_body_content": map[string]interface{}{"test": "abc"}}, + "url": "localhost", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + if err := cfg.Unpack(&conf); err == nil { + t.Fatal("Configuration validation failed. no_http_body and pagination.extra_body_content cannot coexist.") + } +} + +func TestConfigValidationCase3(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "no_http_body": true, + "pagination": map[string]interface{}{"req_field": "abc"}, + "url": "localhost", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + if err := cfg.Unpack(&conf); err == nil { + t.Fatal("Configuration validation failed. no_http_body and pagination.req_field cannot coexist.") + } +} + +func TestConfigValidationCase4(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "pagination": map[string]interface{}{"header": map[string]interface{}{"field_name": "Link", "regex_pattern": "<([^>]+)>; *rel=\"next\"(?:,|$)"}, "req_field": "abc"}, + "url": "localhost", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + if err := cfg.Unpack(&conf); err == nil { + t.Fatal("Configuration validation failed. pagination.header and pagination.req_field cannot coexist.") + } +} + +func TestConfigValidationCase5(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "pagination": map[string]interface{}{"header": map[string]interface{}{"field_name": "Link", "regex_pattern": "<([^>]+)>; *rel=\"next\"(?:,|$)"}, "id_field": "abc"}, + "url": "localhost", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + if err := cfg.Unpack(&conf); err == nil { + t.Fatal("Configuration validation failed. pagination.header and pagination.id_field cannot coexist.") + } +} + +func TestConfigValidationCase6(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "pagination": map[string]interface{}{"header": map[string]interface{}{"field_name": "Link", "regex_pattern": "<([^>]+)>; *rel=\"next\"(?:,|$)"}, "extra_body_content": map[string]interface{}{"test": "abc"}}, + "url": "localhost", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + if err := cfg.Unpack(&conf); err == nil { + t.Fatal("Configuration validation failed. pagination.header and extra_body_content cannot coexist.") + } +} + +func TestConfigValidationCase7(t *testing.T) { + m := map[string]interface{}{ + "http_method": "DELETE", + "no_http_body": true, + "url": "localhost", + } + cfg := common.MustNewConfigFrom(m) + conf := defaultConfig() + if err := cfg.Unpack(&conf); err == nil { + t.Fatal("Configuration validation failed. http_method DELETE is not allowed.") + } +} + +func TestGetNextLinkFromHeader(t *testing.T) { + header := make(http.Header) + header.Add("Link", "; rel=\"self\"") + header.Add("Link", "; rel=\"next\"") + re, _ := regexp.Compile("<([^>]+)>; *rel=\"next\"(?:,|$)") + url, err := getNextLinkFromHeader(header, "Link", re) + if url != "https://dev-168980.okta.com/api/v1/logs?after=1581658181086_1" { + t.Fatal("Failed to test getNextLinkFromHeader. URL " + url + " is not expected") + } + if err != nil { + t.Fatal("Failed to test getNextLinkFromHeader with error:", err) + } +} + +func TestCreateRequestInfoFromBody(t *testing.T) { + m := map[string]interface{}{ + "id": 100, + } + extraBodyContent := common.MapStr{"extra_body": "abc"} + ri, err := createRequestInfoFromBody(common.MapStr(m), "id", "pagination_id", extraBodyContent, "https://test-123", &RequestInfo{ + URL: "", + ContentMap: common.MapStr{}, + Headers: common.MapStr{}, + }) + if ri.URL != "https://test-123" { + t.Fatal("Failed to test createRequestInfoFromBody. URL should be https://test-123.") + } + p, err := ri.ContentMap.GetValue("pagination_id") + if err != nil { + t.Fatal("Failed to test createRequestInfoFromBody with error", err) + } + switch pt := p.(type) { + case int: + if pt != 100 { + t.Fatalf("Failed to test createRequestInfoFromBody. pagination_id value %d should be 100.", pt) + } + default: + t.Fatalf("Failed to test createRequestInfoFromBody. pagination_id value %T should be int.", pt) + } + b, err := ri.ContentMap.GetValue("extra_body") + if err != nil { + t.Fatal("Failed to test createRequestInfoFromBody with error", err) + } + switch bt := b.(type) { + case string: + if bt != "abc" { + t.Fatalf("Failed to test createRequestInfoFromBody. extra_body value %s does not match \"abc\".", bt) + } + default: + t.Fatalf("Failed to test createRequestInfoFromBody. extra_body type %T should be string.", bt) + } +} + +func TestGetRateLimitCase1(t *testing.T) { + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "120") + header.Add("X-Rate-Limit-Remaining", "118") + header.Add("X-Rate-Limit-Reset", "1581658643") + rateLimit := &RateLimit{ + Limit: "X-Rate-Limit-Limit", + Reset: "X-Rate-Limit-Reset", + Remaining: "X-Rate-Limit-Remaining", + } + epoch, err := getRateLimit(header, rateLimit) + if err != nil || epoch != 0 { + t.Fatal("Failed to test getRateLimit.") + } +} + +func TestGetRateLimitCase2(t *testing.T) { + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "10") + header.Add("X-Rate-Limit-Remaining", "0") + header.Add("X-Rate-Limit-Reset", "1581658643") + rateLimit := &RateLimit{ + Limit: "X-Rate-Limit-Limit", + Reset: "X-Rate-Limit-Reset", + Remaining: "X-Rate-Limit-Remaining", + } + epoch, err := getRateLimit(header, rateLimit) + if err != nil || epoch != 1581658643 { + t.Fatal("Failed to test getRateLimit.") + } +} + func TestGET(t *testing.T) { m := map[string]interface{}{ "http_method": "GET", "interval": 0, } - runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -179,7 +359,7 @@ func TestGetHTTPS(t *testing.T) { "interval": 0, "ssl.verification_mode": "none", } - runTest(t, true, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -201,7 +381,7 @@ func TestPOST(t *testing.T) { "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 0, } - runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -223,7 +403,7 @@ func TestRepeatedPOST(t *testing.T) { "http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}}, "interval": 10 ^ 9, } - runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -244,7 +424,7 @@ func TestRunStop(t *testing.T) { "http_method": "GET", "interval": 0, } - runTest(t, false, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) { + runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { input.Run() input.Stop() input.Run() diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index c9fcc7087f9..c7a67a6e1a4 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -8,8 +8,11 @@ import ( "bytes" "context" "encoding/json" + "io" "io/ioutil" "net/http" + "regexp" + "strconv" "sync" "time" @@ -38,7 +41,7 @@ func init() { } } -type httpjsonInput struct { +type HttpjsonInput struct { config log *logp.Logger outlet channel.Outleter // Output of received messages. @@ -50,7 +53,7 @@ type httpjsonInput struct { workerWg sync.WaitGroup // Waits on worker goroutine. } -type requestInfo struct { +type RequestInfo struct { URL string ContentMap common.MapStr Headers common.MapStr @@ -92,7 +95,7 @@ func NewInput( // to be recreated with each restart. workerCtx, workerCancel := context.WithCancel(inputCtx) - in := &httpjsonInput{ + in := &HttpjsonInput{ config: conf, log: logp.NewLogger("httpjson").With( "url", conf.URL), @@ -108,7 +111,7 @@ func NewInput( // Run starts the input worker then returns. Only the first invocation // will ever start the worker. -func (in *httpjsonInput) Run() { +func (in *HttpjsonInput) Run() { in.workerOnce.Do(func() { in.workerWg.Add(1) go func() { @@ -125,9 +128,17 @@ func (in *httpjsonInput) Run() { } // createHTTPRequest creates an HTTP/HTTPs request for the input -func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo) (*http.Request, error) { - b, _ := json.Marshal(ri.ContentMap) - body := bytes.NewReader(b) +func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo) (*http.Request, error) { + var body io.Reader + if len(ri.ContentMap) == 0 || in.config.NoHTTPBody { + body = nil + } else { + b, err := json.Marshal(ri.ContentMap) + if err != nil { + return nil, err + } + body = bytes.NewReader(b) + } req, err := http.NewRequest(in.config.HTTPMethod, ri.URL, body) if err != nil { return nil, err @@ -137,7 +148,11 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo) req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", userAgent) if in.config.APIKey != "" { - req.Header.Set("Authorization", in.config.APIKey) + if in.config.AuthenticationScheme != "" { + req.Header.Set("Authorization", in.config.AuthenticationScheme+" "+in.config.APIKey) + } else { + req.Header.Set("Authorization", in.config.APIKey) + } } for k, v := range ri.Headers { switch vv := v.(type) { @@ -149,8 +164,125 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo) return req, nil } +// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. +func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) { + var m map[string]interface{} + for _, t := range events { + switch v := t.(type) { + case map[string]interface{}: + m = v + d, err := json.Marshal(v) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal %+v", v) + } + ok := in.outlet.OnEvent(makeEvent(string(d))) + if !ok { + return nil, errors.New("function OnEvent returned false") + } + default: + return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v) + } + } + return m, nil +} + +// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response +func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Regexp) (string, error) { + links, ok := header[fieldName] + if !ok { + return "", errors.Errorf("field %s does not exist in the HTTP Header", fieldName) + } + for _, link := range links { + matchArray := re.FindAllStringSubmatch(link, -1) + if len(matchArray) == 1 { + return matchArray[0][1], nil + } + } + return "", nil +} + +// getRateLimit get the rate limit value if specified in the HTTP Header of the response +func getRateLimit(header http.Header, rateLimit *RateLimit) (int64, error) { + if rateLimit != nil { + if rateLimit.Remaining != "" { + remaining := header.Get(rateLimit.Remaining) + if remaining == "" { + return 0, errors.Errorf("field %s does not exist in the HTTP Header, or is empty", rateLimit.Remaining) + } + m, err := strconv.ParseInt(remaining, 10, 64) + if err != nil { + return 0, errors.Wrapf(err, "failed to parse rate-limit remaining value") + } + if m == 0 { + reset := header.Get(rateLimit.Reset) + if reset == "" { + return 0, errors.Errorf("field %s does not exist in the HTTP Header, or is empty", rateLimit.Reset) + } + epoch, err := strconv.ParseInt(reset, 10, 64) + if err != nil { + return 0, errors.Wrapf(err, "failed to parse rate-limit reset value") + } + return epoch, nil + } + } + } + return 0, nil +} + +// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response +func (in *HttpjsonInput) applyRateLimit(ctx context.Context, header http.Header, rateLimit *RateLimit) error { + epoch, err := getRateLimit(header, rateLimit) + if err != nil { + return err + } + if epoch == 0 { + return nil + } + t := time.Unix(epoch, 0) + in.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t) + ticker := time.NewTicker(time.Until(t)) + defer ticker.Stop() + select { + case <-ctx.Done(): + in.log.Info("Context done.") + return nil + case <-ticker.C: + in.log.Debug("Rate Limit: time is up.") + return nil + } +} + +// createRequestInfoFromBody creates a new RequestInfo for a new HTTP request in pagination based on HTTP response body +func createRequestInfoFromBody(m common.MapStr, idField string, requestField string, extraBodyContent common.MapStr, url string, ri *RequestInfo) (*RequestInfo, error) { + v, err := m.GetValue(idField) + if err != nil { + if err == common.ErrKeyNotFound { + return nil, nil + } else { + return nil, errors.Wrapf(err, "failed to retrieve id_field for pagination") + } + } + if requestField != "" { + ri.ContentMap.Put(requestField, v) + if url != "" { + ri.URL = url + } + } else { + switch vt := v.(type) { + case string: + ri.URL = vt + default: + return nil, errors.New("pagination ID is not of string type") + } + } + if len(extraBodyContent) > 0 { + ri.ContentMap.Update(extraBodyContent) + } + return ri, nil +} + // processHTTPRequest processes HTTP request, and handles pagination if enabled -func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *requestInfo) error { +func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *RequestInfo) error { for { req, err := in.createHTTPRequest(ctx, ri) if err != nil { @@ -161,6 +293,7 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl return errors.Wrapf(err, "failed to execute http client.Do") } responseData, err := ioutil.ReadAll(msg.Body) + header := msg.Header msg.Body.Close() if err != nil { return errors.Wrapf(err, "failed to read http.response.body") @@ -170,76 +303,83 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode) } var m, v interface{} + var mm map[string]interface{} err = json.Unmarshal(responseData, &m) if err != nil { + in.log.Debug("failed to unmarshal http.response.body", string(responseData)) return errors.Wrapf(err, "failed to unmarshal http.response.body") } - switch mmap := m.(type) { + switch obj := m.(type) { + // Top level Array + case []interface{}: + mm, err = in.processEventArray(obj) + if err != nil { + return err + } case map[string]interface{}: if in.config.JSONObjects == "" { - ok := in.outlet.OnEvent(makeEvent(string(responseData))) - if !ok { - return errors.New("function OnEvent returned false") + mm, err = in.processEventArray([]interface{}{obj}) + if err != nil { + return err } } else { - v, err = common.MapStr(mmap).GetValue(in.config.JSONObjects) + v, err = common.MapStr(mm).GetValue(in.config.JSONObjects) if err != nil { return err } switch ts := v.(type) { case []interface{}: - for _, t := range ts { - switch tv := t.(type) { - case map[string]interface{}: - d, err := json.Marshal(tv) - if err != nil { - return errors.Wrapf(err, "failed to marshal json_objects_array") - } - ok := in.outlet.OnEvent(makeEvent(string(d))) - if !ok { - return errors.New("function OnEvent returned false") - } - default: - return errors.New("invalid json_objects_array configuration") - } + mm, err = in.processEventArray(ts) + if err != nil { + return err } default: - return errors.New("invalid json_objects_array configuration") + return errors.Errorf("content of %s is not a valid array", in.config.JSONObjects) } } - if in.config.Pagination != nil && in.config.Pagination.IsEnabled { - v, err = common.MapStr(mmap).GetValue(in.config.Pagination.IDField) + default: + in.log.Debug("http.response.body is not a valid JSON object", string(responseData)) + return errors.Errorf("http.response.body is not a valid JSON object, but a %T", obj) + } + + if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled() { + if in.config.Pagination.Header != nil { + // Pagination control using HTTP Header + url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern) if err != nil { - in.log.Info("Successfully processed HTTP request. Pagination finished.") + return errors.Wrapf(err, "failed to retrieve the next URL for pagination") + } + if ri.URL == url || url == "" { + in.log.Info("Pagination finished.") return nil } - if in.config.Pagination.RequestField != "" { - ri.ContentMap.Put(in.config.Pagination.RequestField, v) - if in.config.Pagination.URL != "" { - ri.URL = in.config.Pagination.URL - } - } else { - switch v.(type) { - case string: - ri.URL = v.(string) - default: - return errors.New("pagination ID is not of string type") - } + ri.URL = url + if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil { + return err + } + in.log.Info("Continuing with pagination to URL: ", ri.URL) + continue + } else { + // Pagination control using HTTP Body fields + ri, err := createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri) + if err != nil { + return err } - if in.config.Pagination.ExtraBodyContent != nil { - ri.ContentMap.Update(common.MapStr(in.config.Pagination.ExtraBodyContent)) + if ri == nil { + return nil + } + if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil { + return err } + in.log.Info("Continuing with pagination to URL: ", ri.URL) continue } - return nil - default: - in.log.Debugw("http.response.body is not valid JSON", string(responseData)) - return errors.New("http.response.body is not valid JSON") } + return nil } } -func (in *httpjsonInput) run() error { +func (in *HttpjsonInput) run() error { ctx, cancel := context.WithCancel(in.workerCtx) defer cancel() @@ -268,7 +408,7 @@ func (in *httpjsonInput) run() error { Timeout: in.config.HTTPClientTimeout, } - ri := &requestInfo{ + ri := &RequestInfo{ URL: in.URL, ContentMap: common.MapStr{}, Headers: in.HTTPHeaders, @@ -298,13 +438,13 @@ func (in *httpjsonInput) run() error { } // Stop stops the misp input and waits for it to fully stop. -func (in *httpjsonInput) Stop() { +func (in *HttpjsonInput) Stop() { in.workerCancel() in.workerWg.Wait() } // Wait is an alias for Stop. -func (in *httpjsonInput) Wait() { +func (in *HttpjsonInput) Wait() { in.Stop() }