Skip to content

Commit

Permalink
add heartbeat timeout option (#238)
Browse files Browse the repository at this point in the history
* add heartbeat timeout option
  • Loading branch information
crimson-gao authored Oct 31, 2023
1 parent 7cb5da3 commit 4369a3b
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
1 change: 1 addition & 0 deletions consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可
|CursorPosition|消费的点位|必填,支持 1.BEGIN_CURSOR: logstore的开始点位 2. END_CURSOR: logstore的最新数据点位 3.SPECIAL_TIME_CURSOR: 自行设置的unix时间戳|
||sls的logstore|必填|
|HeartbeatIntervalInSecond|心跳的时间间隔|非必填,默认时间为20s, sdk会根据心跳时间与服务器确认alive|
|HeartbeatTimeoutInSecond|心跳的超时间隔|非必填,默认时间为HeartbeatIntervalInSecond的3倍, sdk会根据心跳时间与服务器确认alive,持续心跳失败达到超时时间后后,服务器可重新分配该超时shard|
|DataFetchIntervalInMs|数据默认拉取的间隔|非必填,默认为200ms|
|MaxFetchLogGroupCount|数据一次拉取的log group数量|非必填,默认为1000|
|CursorStartTime|数据点位的时间戳|非必填,CursorPosition为SPECIAL_TIME_CURSOR时需填写|
Expand Down
6 changes: 5 additions & 1 deletion consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ type LogHubConfig struct {
//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
// Provide three options :BEGIN_CURSOR,END_CURSOR,SPECIAL_TIMER_CURSOR,when you choose SPECIAL_TIMER_CURSOR, you have to set CursorStartTime parameter.
//:param HeartbeatIntervalInSecond:
// default 20, once a client doesn't report to server * heartbeat_interval * 3 interval,
// default 20, once a client doesn't report to server * HeartbeatTimeoutInSecond seconds,
// server will consider it's offline and re-assign its task to another consumer.
// don't set the heatbeat interval too small when the network badwidth or performance of consumtion is not so good.
//:param DataFetchIntervalInMs: default 200(Millisecond), don't configure it too small (<100Millisecond)
//:param HeartbeatTimeoutInSecond:
// default HeartbeatIntervalInSecond * 3, once a client doesn't report to server HeartbeatTimeoutInSecond seconds,
// server will consider it's offline and re-assign its task to another consumer.
//:param MaxFetchLogGroupCount: default 1000, fetch size in each request, normally use default. maximum is 1000, could be lower. the lower the size the memory efficiency might be better.
//:param CursorStartTime: Will be used when cursor_position when could be "begin", "end", "specific time format in time stamp", it's log receiving time. The unit of parameter is seconds.
//:param InOrder:
Expand Down Expand Up @@ -49,6 +52,7 @@ type LogHubConfig struct {
ConsumerName string
CursorPosition string
HeartbeatIntervalInSecond int
HeartbeatTimeoutInSecond int
DataFetchIntervalInMs int64
MaxFetchLogGroupCount int
CursorStartTime int64 // Unix time stamp; Units are seconds.
Expand Down
5 changes: 4 additions & 1 deletion consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
if option.HeartbeatIntervalInSecond == 0 {
option.HeartbeatIntervalInSecond = 20
}
if option.HeartbeatTimeoutInSecond == 0 {
option.HeartbeatTimeoutInSecond = option.HeartbeatIntervalInSecond * 3
}
if option.DataFetchIntervalInMs == 0 {
option.DataFetchIntervalInMs = 200
}
Expand All @@ -42,7 +45,7 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
}
consumerGroup := sls.ConsumerGroup{
ConsumerGroupName: option.ConsumerGroupName,
Timeout: option.HeartbeatIntervalInSecond * 3,
Timeout: option.HeartbeatTimeoutInSecond,
InOrder: option.InOrder,
}
consumerClient := &ConsumerClient{
Expand Down
8 changes: 4 additions & 4 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (consumer *ShardConsumerWorker) updateStatus(success bool) {
switch status {
case PULLING:
consumer.setConsumerStatus(PROCESSING)
case INITIALIZING,PROCESSING:
case INITIALIZING, PROCESSING:
consumer.setConsumerStatus(PULLING)
}
}
Expand All @@ -145,14 +145,14 @@ func (consumer *ShardConsumerWorker) updateStatus(success bool) {
}

func (consumer *ShardConsumerWorker) shouldFetch() bool {
if consumer.lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || consumer.lastFetchRawSize >= 4 * 1024 * 1024 {
if consumer.lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || consumer.lastFetchRawSize >= 4*1024*1024 {
return true
}
duration := time.Since(consumer.lastFetchTime)
if consumer.lastFetchGroupCount < 100 && consumer.lastFetchRawSize < 1024 * 1024{
if consumer.lastFetchGroupCount < 100 && consumer.lastFetchRawSize < 1024*1024 {
// The time used here is in milliseconds.
return duration > 500*time.Millisecond
} else if consumer.lastFetchGroupCount < 500 && consumer.lastFetchRawSize < 2 * 1024 * 1024 {
} else if consumer.lastFetchGroupCount < 500 && consumer.lastFetchRawSize < 2*1024*1024 {
return duration > 200*time.Millisecond
} else {
return duration > 50*time.Millisecond
Expand Down

0 comments on commit 4369a3b

Please sign in to comment.