Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProvisionedThroughputExceededException Error #158

Closed
chenyin0126 opened this issue May 23, 2024 · 5 comments · Fixed by #159, #161 or mskonovalov/kinesis-consumer#1
Closed

ProvisionedThroughputExceededException Error #158

chenyin0126 opened this issue May 23, 2024 · 5 comments · Fixed by #159, #161 or mskonovalov/kinesis-consumer#1

Comments

@chenyin0126
Copy link

chenyin0126 commented May 23, 2024

We have 2 pods, each pod is a consumer and relying on this library. We used .Scan to consume from all shards. We have 8 Kinesis shards, but we meet this error when 2 consumers runs together. We got

shard shardId-000000000010 error: get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: f450bd05-0d8a-739b-aa98-773a29ee48de, ProvisionedThroughputExceededException: Rate exceeded for Shard - 715119903224/command-data-stream/shardId-000000000010

I think the reason would be the consumer get records every 0.25 seconds as default, but one shard at AWS only allow 5 transactions per second. When 2 consumers works at the same time, both of them scan all shards, so it might happen that one shard got getRecords for 8 times per second, so it raises this error: ProvisionedThroughputExceededException

Any idea what is the best way to solve this?

@chenyin0126
Copy link
Author

cc: @harlow

@luanruisong
Copy link
Contributor

luanruisong commented Jun 12, 2024

mayby there is something wrong in func isRetriableError

error log

shard {shardId} error: get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard - {shard}
func isRetriableError(err error) bool {
	switch err.(type) {
	case *types.ExpiredIteratorException:
		return true
	case *types.ProvisionedThroughputExceededException:
		return true
	}
	return false
}

func Scan got error ProvisionedThroughputExceededException but this error maybe retry?

https://github.com/harlow/kinesis-consumer/blob/master/consumer.go#L188

i think it want to retry (wait the ticker),but it is finish the goroutine

maybe should change to

        if oe := (*types.ProvisionedThroughputExceededException)(nil); errors.As(err, &oe) {
		return true
	}

@luanruisong
Copy link
Contributor

luanruisong commented Jun 12, 2024

BTW
there are some problems in commit: 6720a01
It starts the goroutine repeatedly to process the same shard until ProvisionedThroughputExceededException

error log (There are only two shards)

{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213184044,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118179013785811305901676174508770"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213184046,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358002305044641501698271515509490"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213214045,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358045822747370169506734998029042"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213214045,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118207531136970195391339902796514"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213244044,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118236082338052034090689242333922"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213244046,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358091472995244637521131381719794"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213274045,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118265124363016636329277354148578"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213274046,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358130257753389514056238450606834"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213285292,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213288257,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213294575,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213298295,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}

i think the problems in allgroup.go:121

on ticker channel triggered, it is also put the on runing shard into the channel shardc(no parentShard) in the second loop

	for _, shard := range shards {
		if _, ok := g.shards[*shard.ShardId]; ok {
			continue
		}
		g.shards[*shard.ShardId] = shard
		g.shardsClosed[*shard.ShardId] = make(chan struct{})
	}
	for _, shard := range shards {
		shard := shard // Shadow shard, since we use it in goroutine
		var parent1, parent2 <-chan struct{}
		if shard.ParentShardId != nil {
			parent1 = g.shardsClosed[*shard.ParentShardId]
		}
		if shard.AdjacentParentShardId != nil {
			parent2 = g.shardsClosed[*shard.AdjacentParentShardId]
		}
		go func() {
			// Asynchronously wait for all parents of this shard to be processed
			// before providing it out to our client.  Kinesis guarantees that a
			// given partition key's data will be provided to clients in-order,
			// but when splits or joins happen, we need to process all parents prior
			// to processing children or that ordering guarantee is not maintained.
			if waitForCloseChannel(ctx, parent1) && waitForCloseChannel(ctx, parent2) {
				shardc <- shard
			}
		}()
	}

@mskonovalov
Copy link
Collaborator

Fix #161

harlow pushed a commit that referenced this issue Sep 16, 2024
Fixes #158. Seems the bug was introduced in #155. See #155 (comment)
@luanruisong
Copy link
Contributor

good job!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants