Skip to content

Commit

Permalink
Merge branch 'master' into fix-ds-assign
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Aug 25, 2021
2 parents 7dc2b81 + cb1d315 commit 96e006d
Show file tree
Hide file tree
Showing 19 changed files with 310 additions and 334 deletions.
4 changes: 4 additions & 0 deletions .ci/packaging.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ def tagAndPush(Map args = [:]) {
}
// supported image flavours
def variants = ["", "-oss", "-ubi8"]
//
if(beatName == 'elastic-agent'){
variants.add("-complete")
}
variants.each { variant ->
tags.each { tag ->
// TODO:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Preserve annotations in a kubernetes namespace metadata {pull}27045[27045]
- Allow conditional processing in `decode_xml` and `decode_xml_wineventlog`. {pull}27159[27159]
- Fix build constraint that caused issues with doc builds. {pull}27381[27381]
- Do not try to load ILM policy if `check_exists` is `false`. {pull}27508[27508] {issue}26322[26322]

*Auditbeat*

Expand Down
8 changes: 1 addition & 7 deletions filebeat/docs/modules/azure.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Will retrieve azure Active Directory audit logs. The audit logs provide traceabi
`eventhub` ::
_string_
Is the fully managed, real-time data ingestion service.
Default value `insights-operational-logs`.
Default value of `insights-operational-logs` for activitylogs, `insights-logs-auditlogs` for auditlogs, and `insights-logs-signinlogs` for signinlogs. It is recommended to use a separate eventhub for each log type as the field mappings of each log type are different.

`consumer_group` ::
_string_
Expand Down Expand Up @@ -127,12 +127,6 @@ The azure module comes with several predefined dashboards for general cloud over
image::./images/filebeat-azure-overview.png[]








[float]
=== Fields

Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestFilestreamCloseEOF(t *testing.T) {

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
t.Skip("Flaky test https://github.com/elastic/beats/issues/27585")
env := newInputTestingEnvironment(t)

testlogName := "test.log"
Expand Down Expand Up @@ -693,6 +694,7 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) {
}

func TestFilestreamTruncateBlockedOutput(t *testing.T) {
t.Skip("Flaky test https://github.com/elastic/beats/issues/27085")
env := newInputTestingEnvironment(t)
env.pipeline = &mockPipelineConnector{blocking: true}

Expand Down
11 changes: 11 additions & 0 deletions heartbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,14 @@ heartbeat.scheduler:

# Set the scheduler it's time zone
#location: ''

heartbeat.jobs:
# Limit the number of concurrent monitors executed by heartbeat. This differs from
# heartbeat.scheduler.limit in that it maps to individual monitors rather than the
# subtasks of monitors. For non-browser monitors a subtask usually corresponds to a
# single file descriptor.
# This feature is most useful for the browser type
#browser.limit: 1
#http.limit: 10
#tcp.limit: 10
#icmp.limit: 10
10 changes: 10 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ heartbeat.scheduler:
# Set the scheduler it's time zone
#location: ''

heartbeat.jobs:
# Limit the number of concurrent monitors executed by heartbeat. This differs from
# heartbeat.scheduler.limit in that it maps to individual monitors rather than the
# subtasks of monitors. For non-browser monitors a subtask usually corresponds to a
# single file descriptor.
# This feature is most useful for the browser type
#browser.limit: 1
#http.limit: 10
#tcp.limit: 10
#icmp.limit: 10
# ================================== General ===================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
124 changes: 124 additions & 0 deletions heartbeat/scheduler/schedjob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package scheduler

import (
"context"
"sync"
"time"

"golang.org/x/sync/semaphore"

"github.com/elastic/beats/v7/libbeat/common/atomic"
)

type schedJob struct {
id string
ctx context.Context
scheduler *Scheduler
wg *sync.WaitGroup
entrypoint TaskFunc
jobLimitSem *semaphore.Weighted
activeTasks atomic.Int
}

// runRecursiveJob runs the entry point for a job, blocking until all subtasks are completed.
// Subtasks are run in separate goroutines.
// returns the time execution began on its first task
func newSchedJob(ctx context.Context, s *Scheduler, id string, jobType string, task TaskFunc) *schedJob {
return &schedJob{
id: id,
ctx: ctx,
scheduler: s,
jobLimitSem: s.jobLimitSem[jobType],
entrypoint: task,
activeTasks: atomic.MakeInt(0),
wg: &sync.WaitGroup{},
}
}

// runRecursiveTask runs an individual task and its continuations until none are left with as much parallelism as possible.
// Since task funcs can emit continuations recursively we need a function to execute
// recursively.
// The wait group passed into this function expects to already have its count incremented by one.
func (sj *schedJob) run() (startedAt time.Time) {
sj.wg.Add(1)
sj.activeTasks.Inc()
if sj.jobLimitSem != nil {
sj.jobLimitSem.Acquire(sj.ctx, 1)
}

startedAt = sj.runTask(sj.entrypoint)

sj.wg.Wait()
return startedAt
}

// runRecursiveTask runs an individual task and its continuations until none are left with as much parallelism as possible.
// Since task funcs can emit continuations recursively we need a function to execute
// recursively.
// The wait group passed into this function expects to already have its count incremented by one.
func (sj *schedJob) runTask(task TaskFunc) time.Time {
defer sj.wg.Done()
defer sj.activeTasks.Dec()

// The accounting for waiting/active tasks is done using atomics.
// Absolute accuracy is not critical here so the gap between modifying waitingTasks and activeJobs is acceptable.
sj.scheduler.stats.waitingTasks.Inc()

// Acquire an execution slot in keeping with heartbeat.scheduler.limit
// this should block until resources are available.
// In the case where the semaphore has free resources immediately
// it will not block and will not check the cancelled status of the
// context, which is OK, because we check it later anyway.
limitErr := sj.scheduler.limitSem.Acquire(sj.ctx, 1)
sj.scheduler.stats.waitingTasks.Dec()
if limitErr == nil {
defer sj.scheduler.limitSem.Release(1)
}

// Record the time this task started now that we have a resource to execute with
startedAt := time.Now()

// Check if the scheduler has been shut down. If so, exit early
select {
case <-sj.ctx.Done():
return startedAt
default:
sj.scheduler.stats.activeTasks.Inc()

continuations := task(sj.ctx)
sj.scheduler.stats.activeTasks.Dec()

sj.wg.Add(len(continuations))
sj.activeTasks.Add(len(continuations))
for _, cont := range continuations {
// Run continuations in parallel, note that these each will acquire their own slots
// We can discard the started at times for continuations as those are
// irrelevant
go sj.runTask(cont)
}
// There is always at least 1 task (the current one), if that's all, then we know
// there are no other jobs active or pending, and we can release the jobLimitSem
if sj.jobLimitSem != nil && sj.activeTasks.Load() == 1 {
sj.jobLimitSem.Release(1)
}
}

return startedAt
}
122 changes: 122 additions & 0 deletions heartbeat/scheduler/schedjob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package scheduler

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/config"
batomic "github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

func TestSchedJobRun(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()

testCases := []struct {
name string
jobCtx context.Context
overLimit bool
shouldRunTask bool
}{
{
"context not cancelled",
context.Background(),
false,
true,
},
{
"context cancelled",
cancelledCtx,
false,
false,
},
{
"context cancelled over limit",
cancelledCtx,
true,
false,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
limit := int64(100)
s := NewWithLocation(limit, monitoring.NewRegistry(), tarawaTime(), nil)

if testCase.overLimit {
s.limitSem.Acquire(context.Background(), limit)
}

wg := &sync.WaitGroup{}
wg.Add(1)
executed := batomic.MakeBool(false)

tf := func(ctx context.Context) []TaskFunc {
executed.Store(true)
return nil
}

beforeStart := time.Now()
sj := newSchedJob(testCase.jobCtx, s, "myid", "atype", tf)
startedAt := sj.run()

// This will panic in the case where we don't check s.limitSem.Acquire
// for an error value and released an unacquired resource in scheduler.go.
// In that case this will release one more resource than allowed causing
// the panic.
if testCase.overLimit {
s.limitSem.Release(limit)
}

require.Equal(t, testCase.shouldRunTask, executed.Load())
require.True(t, startedAt.Equal(beforeStart) || startedAt.After(beforeStart))
})
}
}

// testRecursiveForkingJob tests that a schedJob that splits into multiple parallel pieces executes without error
func TestRecursiveForkingJob(t *testing.T) {
s := NewWithLocation(1000, monitoring.NewRegistry(), tarawaTime(), map[string]config.JobLimit{
"atype": {Limit: 1},
})
ran := batomic.NewInt(0)

var terminalTf TaskFunc = func(ctx context.Context) []TaskFunc {
ran.Inc()
return nil
}
var forkingTf TaskFunc = func(ctx context.Context) []TaskFunc {
ran.Inc()
return []TaskFunc{
terminalTf, terminalTf, terminalTf,
}
}

sj := newSchedJob(context.Background(), s, "myid", "atype", forkingTf)

sj.run()
require.Equal(t, 4, ran.Load())

}
Loading

0 comments on commit 96e006d

Please sign in to comment.