Skip to content

Commit

Permalink
Merge pull request #67 from scribd/maksimt/SERF-2325/adjust-kafka-sub…
Browse files Browse the repository at this point in the history
…scriber-config

[SERF-2325] Configure `AutoCommit` for Kafka subscriber
  • Loading branch information
Neurostep authored Feb 16, 2023
2 parents 8c29092 + f21fc05 commit 3d31140
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -867,11 +867,12 @@ To break it down further `Publisher` and `Subscriber` have their own set of conf

**Subscriber**:

| Setting | Description | YAML variable | Environment variable (ENV) | Type | Possible Values |
|----------------|----------------------------------------------------------------------------------------|-------------------|-----------------------------------------------|--------|-----------------|
| Topic | Topic name | `topic` | `APP_PUBSUB_KAFKA_SUBSCRIBER_TOPIC` | string | topic |
| Group ID | Client group id string. All clients sharing the same group id belong to the same group | `group_id` | `APP_PUBSUB_KAFKA_SUBSCRIBER_GROUP_ID` | string | service-name |
| Enable Metrics | Enable publishing metrics for Kafka consumer | `metrics_enabled` | `APP_PUBSUB_KAFKA_SUBSCRIBER_METRICS_ENABLED` | bool | true, false |
| Setting | Description | YAML variable | Environment variable (ENV) | Type | Possible Values |
|-------------------|-----------------------------------------------------------------------------------------|-----------------------|---------------------------------------------------|--------|-----------------|
| Topic | Topic name | `topic` | `APP_PUBSUB_KAFKA_SUBSCRIBER_TOPIC` | string | topic |
| Group ID | Client group id string. All clients sharing the same group id belong to the same group | `group_id` | `APP_PUBSUB_KAFKA_SUBSCRIBER_GROUP_ID` | string | service-name |
| Enable Metrics | Enable publishing metrics for Kafka consumer | `metrics_enabled` | `APP_PUBSUB_KAFKA_SUBSCRIBER_METRICS_ENABLED` | bool | true, false |
| Enable AutoCommit | Enable AutoCommit option for Kafka consumer (default `true`) | `auto_commit.enabled` | `APP_PUBSUB_KAFKA_SUBSCRIBER_AUTO_COMMIT_ENABLED` | bool | true, false |


To authenticate the requests to Kafka, Go SDK provides a configuration set for TLS and [SASL](https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer)
Expand Down
9 changes: 9 additions & 0 deletions pkg/pubsub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ type (
Enabled bool `mapstructure:"enabled"`
// MetricsEnabled controls if metrics publishing is enabled or not
MetricsEnabled bool `mapstructure:"metrics_enabled"`
// AutoCommit controls if the subscriber should auto commit messages
AutoCommit AutoCommit `mapstructure:"auto_commit"`
}

AutoCommit struct {
// Enabled whether the auto commit is enabled or not
Enabled bool `mapstructure:"enabled"`
}

TLS struct {
Expand Down Expand Up @@ -146,6 +153,8 @@ func NewConfig() (*Config, error) {
config := &Config{}
viperBuilder := cbuilder.New("pubsub")

viperBuilder.SetDefault("kafka.subscriber.auto_commit.enabled", true)

vConf, err := viperBuilder.Build()
if err != nil {
return config, err
Expand Down
40 changes: 40 additions & 0 deletions pkg/pubsub/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func TestNewConfigWithAppRoot(t *testing.T) {
},
Subscriber: Subscriber{
Topic: "test-topic",
AutoCommit: AutoCommit{
Enabled: true,
},
},
SSLVerificationEnabled: true,
},
Expand All @@ -77,12 +80,40 @@ func TestNewConfigWithAppRoot(t *testing.T) {
},
Subscriber: Subscriber{
Topic: "test-topic",
AutoCommit: AutoCommit{
Enabled: true,
},
},
SSLVerificationEnabled: true,
},

envOverrides: [][]string{{"APP_PUBSUB_KAFKA_BROKER_URLS", "localhost:9092 localhost:9093"}},
},
{
name: "NewWithConfigFileWorks (auto_commimt override)",
env: "test",
kafka: Kafka{
BrokerUrls: []string{"localhost:9092"},
ClientId: "test-app",
Cert: "pem string",
CertKey: "pem key",
SecurityProtocol: "ssl",
Publisher: Publisher{
MaxAttempts: 3,
WriteTimeout: 10 * time.Second,
Topic: "test-topic",
},
Subscriber: Subscriber{
Topic: "test-topic",
AutoCommit: AutoCommit{
Enabled: false,
},
},
SSLVerificationEnabled: true,
},

envOverrides: [][]string{{"APP_PUBSUB_KAFKA_SUBSCRIBER_AUTO_COMMIT_ENABLED", "false"}},
},
{
name: "NewWithConfigFileWorks (TLS config override)",
env: "test",
Expand All @@ -99,6 +130,9 @@ func TestNewConfigWithAppRoot(t *testing.T) {
},
Subscriber: Subscriber{
Topic: "test-topic",
AutoCommit: AutoCommit{
Enabled: true,
},
},
SSLVerificationEnabled: true,
TLS: TLS{
Expand Down Expand Up @@ -129,6 +163,9 @@ func TestNewConfigWithAppRoot(t *testing.T) {
},
Subscriber: Subscriber{
Topic: "test-topic",
AutoCommit: AutoCommit{
Enabled: true,
},
},
SASL: SASL{
Enabled: true,
Expand Down Expand Up @@ -158,6 +195,9 @@ func TestNewConfigWithAppRoot(t *testing.T) {
},
Subscriber: Subscriber{
Topic: "test-topic",
AutoCommit: AutoCommit{
Enabled: true,
},
},
SASL: SASL{
Enabled: true,
Expand Down

0 comments on commit 3d31140

Please sign in to comment.