diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d4b4666480a..311b8dd2019 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -131,6 +131,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] +- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] *Heartbeat* diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index c5861174636..8215bc3c389 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -57,7 +57,8 @@ #subscription.num_goroutines: 1 # Maximum number of unprocessed messages to allow at any time. - #subscription.max_outstanding_messages: 1000 + # This must be at least queue.mem.flush.min_events to prevent input blockage. + #subscription.max_outstanding_messages: 1600 # Path to a JSON file containing the credentials and key used to subscribe. credentials_file: ${path.config}/my-pubsub-subscriber-credentials.json diff --git a/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc b/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc index 69e6313cad9..6287b19a5ed 100644 --- a/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc @@ -73,7 +73,10 @@ set `subscription.max_outstanding_messages`. Default is 1. The maximum number of unprocessed messages (unacknowledged but not yet expired). If the value is negative, then there will be no limit on the number of -unprocessed messages. Default is 1000. +unprocessed messages. Due to the presence of internal queue, the input gets +blocked until `queue.mem.flush.min_events` or `queue.mem.flush.timeout` +is reached. To prevent this blockage, this option must be at least +`queue.mem.flush.min_events`. Default is 1600. [float] ==== `credentials_file` diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 6aa6707fbdd..0c7cab1acb1 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2964,7 +2964,8 @@ filebeat.inputs: #subscription.num_goroutines: 1 # Maximum number of unprocessed messages to allow at any time. - #subscription.max_outstanding_messages: 1000 + # This must be at least queue.mem.flush.min_events to prevent input blockage. + #subscription.max_outstanding_messages: 1600 # Path to a JSON file containing the credentials and key used to subscribe. credentials_file: ${path.config}/my-pubsub-subscriber-credentials.json diff --git a/x-pack/filebeat/input/gcppubsub/config.go b/x-pack/filebeat/input/gcppubsub/config.go index dd4214645ea..e83d09f12f3 100644 --- a/x-pack/filebeat/input/gcppubsub/config.go +++ b/x-pack/filebeat/input/gcppubsub/config.go @@ -73,7 +73,9 @@ func defaultConfig() config { Type: "gcp-pubsub", } c.Subscription.NumGoroutines = 1 - c.Subscription.MaxOutstandingMessages = 1000 + // The input gets blocked until flush.min_events or flush.timeout is reached. + // Hence max_outstanding_message has to be at least flush.min_events to avoid this blockage. + c.Subscription.MaxOutstandingMessages = 1600 c.Subscription.Create = true return c } diff --git a/x-pack/filebeat/input/gcppubsub/pubsub_test.go b/x-pack/filebeat/input/gcppubsub/pubsub_test.go index bceff5dc3c0..7981a3ee772 100644 --- a/x-pack/filebeat/input/gcppubsub/pubsub_test.go +++ b/x-pack/filebeat/input/gcppubsub/pubsub_test.go @@ -7,7 +7,7 @@ package gcppubsub import ( "context" "errors" - "io/ioutil" + "io" "net/http" "os" "strconv" @@ -70,7 +70,7 @@ func testSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) { } defer resp.Body.Close() - _, err = ioutil.ReadAll(resp.Body) + _, err = io.ReadAll(resp.Body) if err != nil { t.Fatal("failed to read response", err) }