-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ingester API to return stream rates #7176
Conversation
./tools/diff_coverage.sh ../loki-main/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0.2%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I've always wondered how rate limiting was implemented. I wish the algorithm would expose the actual rate :/
pkg/ingester/stream.go
Outdated
@@ -387,6 +397,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay bool) ([]log | |||
} | |||
} | |||
|
|||
s.rateCalculator.Record(int64(totalBytes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not using validBytes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This preserves the current behavior where stream rate limits are calculated against all the bytes in the stream, not just the valid ones. I assume we do it this way because the rate limit is to protect the ingester and we process every entry, valid or not.
pkg/ingester/rate_calculator.go
Outdated
t := time.NewTicker(1000 / bucketCount * time.Millisecond) | ||
for range t.C { | ||
rate := c.sample.Swap(0) | ||
c.storeNewRate(rate) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this never stopped?
**What this PR does / why we need it**: Stop the rateCalculator timer when stream is about to be removed. **Which issue(s) this PR fixes**: N/A
./tools/diff_coverage.sh ../loki-main/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0.2%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
./tools/diff_coverage.sh ../loki-main/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. + ingester 0.1%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
./tools/diff_coverage.sh ../loki-main/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. - ingester -0.2%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
just a small observation that shouldn't change anything
./tools/diff_coverage.sh ../loki-main/test_results.txt test_results.txt ingester,distributor,querier,querier/queryrange,iter,storage,chunkenc,logql,loki Change in test coverage per package. Green indicates 0 or positive change, red indicates that test coverage for a package fell. - ingester -0.3%
+ distributor 0%
+ querier 0%
+ querier/queryrange 0%
+ iter 0%
+ storage 0%
+ chunkenc 0%
+ logql 0%
+ loki 0% |
This PR introduces an API on the Ingester that reports all current streams in memory and their current rate in bytes/second. The reponse from the API contains a slice of `StreamRate` structs: ```go type StreamRate struct { StreamHash uint64 `protobuf:"varint,1,opt,name=streamHash,proto3" json:"streamHash,omitempty"` StreamHashNoShard uint64 `protobuf:"varint,2,opt,name=streamHashNoShard,proto3" json:"streamHashNoShard,omitempty"` Rate int64 `protobuf:"varint,3,opt,name=rate,proto3" json:"rate,omitempty"` } ``` In the interest of space on the caller and in transmission, only hashes and rates are sent. - `StreamHash` will be used to deduplicate replicas - `StreamHashNoShard` will be used to aggregate a sharded stream's total rate. Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
This PR introduces an API on the Ingester that reports all current streams in memory and their current rate in bytes/second. The reponse from the API contains a slice of `StreamRate` structs: ```go type StreamRate struct { StreamHash uint64 `protobuf:"varint,1,opt,name=streamHash,proto3" json:"streamHash,omitempty"` StreamHashNoShard uint64 `protobuf:"varint,2,opt,name=streamHashNoShard,proto3" json:"streamHashNoShard,omitempty"` Rate int64 `protobuf:"varint,3,opt,name=rate,proto3" json:"rate,omitempty"` } ``` In the interest of space on the caller and in transmission, only hashes and rates are sent. - `StreamHash` will be used to deduplicate replicas - `StreamHashNoShard` will be used to aggregate a sharded stream's total rate. Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
This PR introduces an API on the Ingester that reports all current streams in memory and their current rate in bytes/second. The reponse from the API contains a slice of `StreamRate` structs: ```go type StreamRate struct { StreamHash uint64 `protobuf:"varint,1,opt,name=streamHash,proto3" json:"streamHash,omitempty"` StreamHashNoShard uint64 `protobuf:"varint,2,opt,name=streamHashNoShard,proto3" json:"streamHashNoShard,omitempty"` Rate int64 `protobuf:"varint,3,opt,name=rate,proto3" json:"rate,omitempty"` } ``` In the interest of space on the caller and in transmission, only hashes and rates are sent. - `StreamHash` will be used to deduplicate replicas - `StreamHashNoShard` will be used to aggregate a sharded stream's total rate. Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
This PR introduces an API on the Ingester that reports all current streams in memory and their current rate in bytes/second. The reponse from the API contains a slice of
StreamRate
structs:In the interest of space on the caller and in transmission, only hashes and rates are sent.
StreamHash
will be used to deduplicate replicasStreamHashNoShard
will be used to aggregate a sharded stream's total rate.Rate calculation will be in a follow on PR