Skip to content

Commit

Permalink
[7.17](backport #30730) x-pack/filebeat/input/httpjson: make sure int…
Browse files Browse the repository at this point in the history
…erval data clearing happens before processing (#31029)

* x-pack/filebeat/input/httpjson: make sure interval data clearing happens before processing (#30730)

(cherry picked from commit 043cab9)

# Conflicts:
#	x-pack/filebeat/input/httpjson/internal/v2/request.go
#	x-pack/filebeat/input/httpjson/internal/v2/response.go

* fix conflicts

Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com>
Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
  • Loading branch information
3 people authored Mar 31, 2022
1 parent 2d90fa0 commit c71a9a4
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix ECS version string in threatintel to be consistent with other modules and add event.timezone. {issue}30499[30499] {pull}30570[30570]
- Add default paths value to MySQL Enterprise module to prevent issues with pipeline installations {pull}30598[30598]
- Report the starting offset of the line in `log.offset` when using `filestream` instead of the end to be ECS compliant. {pull}30445[30445]
- Fix add_kubernetes_metadata matcher: support rotated logs when `resource_type: pod` {pull}30720[30720]
- Prevent logic race on clearing data during request in httpjson. {pull}30730[30730]
- Fixes data duplication on restart when filestream inputs did not have an ID set. {issue}30061[30061]
- Update documentation for accessing `last_response.url.params` in httpjson input. {pull}30739[30739]
- Fix add_kubernetes_metadata matcher: support rotated logs when `resource_type: pod` {pull}30720[30720]
- Allow fixing data duplication on restart when filestream inputs did not have an ID set. Setting an ID for filestream
Expand Down
10 changes: 2 additions & 8 deletions x-pack/filebeat/input/httpjson/internal/v2/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
}
defer httpResp.Body.Close()

eventsCh, err := r.responseProcessor.startProcessing(stdCtx, trCtx, httpResp)
if err != nil {
return err
}

trCtx.clearIntervalData()

var n int
for maybeMsg := range eventsCh {
events := r.responseProcessor.startProcessing(stdCtx, trCtx, httpResp)
for maybeMsg := range events {
if maybeMsg.failed() {
r.log.Errorf("error processing response: %v", maybeMsg)
continue
Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/input/httpjson/internal/v2/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ func newResponseProcessor(config *responseConfig, pagination *pagination, log *l
return rp
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resp *http.Response) (<-chan maybeMsg, error) {
ch := make(chan maybeMsg)
func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resp *http.Response) <-chan maybeMsg {
trCtx.clearIntervalData()

ch := make(chan maybeMsg)
go func() {
defer close(ch)

Expand Down Expand Up @@ -137,7 +138,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
}
}()

return ch, nil
return ch
}

func (resp *response) asTransformables(log *logp.Logger) []transformable {
Expand Down

0 comments on commit c71a9a4

Please sign in to comment.