From 0e5e3b3c89e4c0aba4b01ed2028c9badaf506dfc Mon Sep 17 00:00:00 2001 From: kcreddy Date: Tue, 16 Apr 2024 20:10:34 +0530 Subject: [PATCH 1/3] Prevent GCP Pub/Sub input blockage by increasing default value --- CHANGELOG.next.asciidoc | 1 + .../filebeat.inputs.reference.xpack.yml.tmpl | 3 +- .../docs/inputs/input-gcp-pubsub.asciidoc | 5 +- x-pack/filebeat/filebeat.reference.yml | 287 +++++++++--------- x-pack/filebeat/input/gcppubsub/config.go | 4 +- .../filebeat/input/gcppubsub/pubsub_test.go | 4 +- 6 files changed, 156 insertions(+), 148 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d4b4666480a..5aaa152b306 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -131,6 +131,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] +- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38917[38917] *Heartbeat* diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index c5861174636..8215bc3c389 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -57,7 +57,8 @@ #subscription.num_goroutines: 1 # Maximum number of unprocessed messages to allow at any time. - #subscription.max_outstanding_messages: 1000 + # This must be at least queue.mem.flush.min_events to prevent input blockage. + #subscription.max_outstanding_messages: 1600 # Path to a JSON file containing the credentials and key used to subscribe. credentials_file: ${path.config}/my-pubsub-subscriber-credentials.json diff --git a/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc b/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc index 69e6313cad9..6287b19a5ed 100644 --- a/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcp-pubsub.asciidoc @@ -73,7 +73,10 @@ set `subscription.max_outstanding_messages`. Default is 1. The maximum number of unprocessed messages (unacknowledged but not yet expired). If the value is negative, then there will be no limit on the number of -unprocessed messages. Default is 1000. +unprocessed messages. Due to the presence of internal queue, the input gets +blocked until `queue.mem.flush.min_events` or `queue.mem.flush.timeout` +is reached. To prevent this blockage, this option must be at least +`queue.mem.flush.min_events`. Default is 1600. [float] ==== `credentials_file` diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 6aa6707fbdd..6836b393275 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1500,147 +1500,147 @@ filebeat.modules: #var.password: #------------------------------ Salesforce Module ------------------------------ -# Configuration file for Salesforce module in Filebeat - -# Common Configurations: -# - enabled: Set to true to enable ingestion of Salesforce module fileset -# - initial_interval: Initial interval for log collection. This setting determines the time period for which the logs will be initially collected when the ingestion process starts, i.e. 1d/h/m/s -# - api_version: API version for Salesforce, version should be greater than 46.0 - -# Authentication Configurations: -# User-Password Authentication: -# - enabled: Set to true to enable user-password authentication -# - client.id: Client ID for user-password authentication -# - client.secret: Client secret for user-password authentication -# - token_url: Token URL for user-password authentication -# - username: Username for user-password authentication -# - password: Password for user-password authentication - -# JWT Authentication: -# - enabled: Set to true to enable JWT authentication -# - client.id: Client ID for JWT authentication -# - client.username: Username for JWT authentication -# - client.key_path: Path to client key for JWT authentication -# - url: Audience URL for JWT authentication - -# Event Monitoring: -# - real_time: Set to true to enable real-time logging using object type data collection -# - real_time_interval: Interval for real-time logging - -# Event Log File: -# - event_log_file: Set to true to enable event log file type data collection -# - elf_interval: Interval for event log file -# - log_file_interval: Interval type for log file collection, either Hourly or Daily - -- module: salesforce - - apex: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.event_log_file: true - var.elf_interval: 1h - var.log_file_interval: "Hourly" - - login: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "client-secret" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.event_log_file: true - var.elf_interval: 1h - var.log_file_interval: "Hourly" - - var.real_time: true - var.real_time_interval: 5m - - logout: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "client-secret" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.event_log_file: true - var.elf_interval: 1h - var.log_file_interval: "Hourly" - - var.real_time: true - var.real_time_interval: 5m - - setupaudittrail: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "client-secret" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.real_time: true +# Configuration file for Salesforce module in Filebeat + +# Common Configurations: +# - enabled: Set to true to enable ingestion of Salesforce module fileset +# - initial_interval: Initial interval for log collection. This setting determines the time period for which the logs will be initially collected when the ingestion process starts, i.e. 1d/h/m/s +# - api_version: API version for Salesforce, version should be greater than 46.0 + +# Authentication Configurations: +# User-Password Authentication: +# - enabled: Set to true to enable user-password authentication +# - client.id: Client ID for user-password authentication +# - client.secret: Client secret for user-password authentication +# - token_url: Token URL for user-password authentication +# - username: Username for user-password authentication +# - password: Password for user-password authentication + +# JWT Authentication: +# - enabled: Set to true to enable JWT authentication +# - client.id: Client ID for JWT authentication +# - client.username: Username for JWT authentication +# - client.key_path: Path to client key for JWT authentication +# - url: Audience URL for JWT authentication + +# Event Monitoring: +# - real_time: Set to true to enable real-time logging using object type data collection +# - real_time_interval: Interval for real-time logging + +# Event Log File: +# - event_log_file: Set to true to enable event log file type data collection +# - elf_interval: Interval for event log file +# - log_file_interval: Interval type for log file collection, either Hourly or Daily + +- module: salesforce + + apex: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.event_log_file: true + var.elf_interval: 1h + var.log_file_interval: "Hourly" + + login: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "client-secret" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.event_log_file: true + var.elf_interval: 1h + var.log_file_interval: "Hourly" + + var.real_time: true + var.real_time_interval: 5m + + logout: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "client-secret" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.event_log_file: true + var.elf_interval: 1h + var.log_file_interval: "Hourly" + + var.real_time: true + var.real_time_interval: 5m + + setupaudittrail: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "client-secret" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.real_time: true var.real_time_interval: 5m #----------------------------- Google Santa Module ----------------------------- - module: santa @@ -2963,8 +2963,9 @@ filebeat.inputs: # Number of goroutines to create to read from the subscription. #subscription.num_goroutines: 1 - # Maximum number of unprocessed messages to allow at any time. - #subscription.max_outstanding_messages: 1000 + # Maximum number of unprocessed messages to allow at any time. + # This must be at least queue.mem.flush.min_events to prevent input blockage. + #subscription.max_outstanding_messages: 1600 # Path to a JSON file containing the credentials and key used to subscribe. credentials_file: ${path.config}/my-pubsub-subscriber-credentials.json diff --git a/x-pack/filebeat/input/gcppubsub/config.go b/x-pack/filebeat/input/gcppubsub/config.go index dd4214645ea..e83d09f12f3 100644 --- a/x-pack/filebeat/input/gcppubsub/config.go +++ b/x-pack/filebeat/input/gcppubsub/config.go @@ -73,7 +73,9 @@ func defaultConfig() config { Type: "gcp-pubsub", } c.Subscription.NumGoroutines = 1 - c.Subscription.MaxOutstandingMessages = 1000 + // The input gets blocked until flush.min_events or flush.timeout is reached. + // Hence max_outstanding_message has to be at least flush.min_events to avoid this blockage. + c.Subscription.MaxOutstandingMessages = 1600 c.Subscription.Create = true return c } diff --git a/x-pack/filebeat/input/gcppubsub/pubsub_test.go b/x-pack/filebeat/input/gcppubsub/pubsub_test.go index bceff5dc3c0..7981a3ee772 100644 --- a/x-pack/filebeat/input/gcppubsub/pubsub_test.go +++ b/x-pack/filebeat/input/gcppubsub/pubsub_test.go @@ -7,7 +7,7 @@ package gcppubsub import ( "context" "errors" - "io/ioutil" + "io" "net/http" "os" "strconv" @@ -70,7 +70,7 @@ func testSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) { } defer resp.Body.Close() - _, err = ioutil.ReadAll(resp.Body) + _, err = io.ReadAll(resp.Body) if err != nil { t.Fatal("failed to read response", err) } From 2b5d11449fc5b20322fbe3ca9ccb94a992e077ef Mon Sep 17 00:00:00 2001 From: kcreddy Date: Tue, 16 Apr 2024 20:17:00 +0530 Subject: [PATCH 2/3] update PR number --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5aaa152b306..311b8dd2019 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -131,7 +131,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] -- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38917[38917] +- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] *Heartbeat* From c24d64c4dca0b47cea11936eb3ae776115731bb3 Mon Sep 17 00:00:00 2001 From: kcreddy Date: Tue, 16 Apr 2024 20:20:52 +0530 Subject: [PATCH 3/3] refactor --- x-pack/filebeat/filebeat.reference.yml | 284 ++++++++++++------------- 1 file changed, 142 insertions(+), 142 deletions(-) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 6836b393275..0c7cab1acb1 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1500,147 +1500,147 @@ filebeat.modules: #var.password: #------------------------------ Salesforce Module ------------------------------ -# Configuration file for Salesforce module in Filebeat - -# Common Configurations: -# - enabled: Set to true to enable ingestion of Salesforce module fileset -# - initial_interval: Initial interval for log collection. This setting determines the time period for which the logs will be initially collected when the ingestion process starts, i.e. 1d/h/m/s -# - api_version: API version for Salesforce, version should be greater than 46.0 - -# Authentication Configurations: -# User-Password Authentication: -# - enabled: Set to true to enable user-password authentication -# - client.id: Client ID for user-password authentication -# - client.secret: Client secret for user-password authentication -# - token_url: Token URL for user-password authentication -# - username: Username for user-password authentication -# - password: Password for user-password authentication - -# JWT Authentication: -# - enabled: Set to true to enable JWT authentication -# - client.id: Client ID for JWT authentication -# - client.username: Username for JWT authentication -# - client.key_path: Path to client key for JWT authentication -# - url: Audience URL for JWT authentication - -# Event Monitoring: -# - real_time: Set to true to enable real-time logging using object type data collection -# - real_time_interval: Interval for real-time logging - -# Event Log File: -# - event_log_file: Set to true to enable event log file type data collection -# - elf_interval: Interval for event log file -# - log_file_interval: Interval type for log file collection, either Hourly or Daily - -- module: salesforce - - apex: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.event_log_file: true - var.elf_interval: 1h - var.log_file_interval: "Hourly" - - login: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "client-secret" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.event_log_file: true - var.elf_interval: 1h - var.log_file_interval: "Hourly" - - var.real_time: true - var.real_time_interval: 5m - - logout: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "client-secret" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.event_log_file: true - var.elf_interval: 1h - var.log_file_interval: "Hourly" - - var.real_time: true - var.real_time_interval: 5m - - setupaudittrail: - enabled: false - var.initial_interval: 1d - var.api_version: 56 - - var.authentication: - user_password_flow: - enabled: true - client.id: "" - client.secret: "client-secret" - token_url: "" - username: "" - password: "" - jwt_bearer_flow: - enabled: false - client.id: "" - client.username: "" - client.key_path: "" - url: "https://login.salesforce.com" - - var.url: "https://instance_id.my.salesforce.com" - - var.real_time: true +# Configuration file for Salesforce module in Filebeat + +# Common Configurations: +# - enabled: Set to true to enable ingestion of Salesforce module fileset +# - initial_interval: Initial interval for log collection. This setting determines the time period for which the logs will be initially collected when the ingestion process starts, i.e. 1d/h/m/s +# - api_version: API version for Salesforce, version should be greater than 46.0 + +# Authentication Configurations: +# User-Password Authentication: +# - enabled: Set to true to enable user-password authentication +# - client.id: Client ID for user-password authentication +# - client.secret: Client secret for user-password authentication +# - token_url: Token URL for user-password authentication +# - username: Username for user-password authentication +# - password: Password for user-password authentication + +# JWT Authentication: +# - enabled: Set to true to enable JWT authentication +# - client.id: Client ID for JWT authentication +# - client.username: Username for JWT authentication +# - client.key_path: Path to client key for JWT authentication +# - url: Audience URL for JWT authentication + +# Event Monitoring: +# - real_time: Set to true to enable real-time logging using object type data collection +# - real_time_interval: Interval for real-time logging + +# Event Log File: +# - event_log_file: Set to true to enable event log file type data collection +# - elf_interval: Interval for event log file +# - log_file_interval: Interval type for log file collection, either Hourly or Daily + +- module: salesforce + + apex: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.event_log_file: true + var.elf_interval: 1h + var.log_file_interval: "Hourly" + + login: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "client-secret" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.event_log_file: true + var.elf_interval: 1h + var.log_file_interval: "Hourly" + + var.real_time: true + var.real_time_interval: 5m + + logout: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "client-secret" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.event_log_file: true + var.elf_interval: 1h + var.log_file_interval: "Hourly" + + var.real_time: true + var.real_time_interval: 5m + + setupaudittrail: + enabled: false + var.initial_interval: 1d + var.api_version: 56 + + var.authentication: + user_password_flow: + enabled: true + client.id: "" + client.secret: "client-secret" + token_url: "" + username: "" + password: "" + jwt_bearer_flow: + enabled: false + client.id: "" + client.username: "" + client.key_path: "" + url: "https://login.salesforce.com" + + var.url: "https://instance_id.my.salesforce.com" + + var.real_time: true var.real_time_interval: 5m #----------------------------- Google Santa Module ----------------------------- - module: santa @@ -2963,7 +2963,7 @@ filebeat.inputs: # Number of goroutines to create to read from the subscription. #subscription.num_goroutines: 1 - # Maximum number of unprocessed messages to allow at any time. + # Maximum number of unprocessed messages to allow at any time. # This must be at least queue.mem.flush.min_events to prevent input blockage. #subscription.max_outstanding_messages: 1600