Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue in pipeline network stats where short job stats would never get populated #2720

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2etest/zt_newe2e_basic_functionality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type BasicFunctionalitySuite struct{}
func (s *BasicFunctionalitySuite) Scenario_SingleFile(svm *ScenarioVariationManager) {
azCopyVerb := ResolveVariation(svm, []AzCopyVerb{AzCopyVerbCopy, AzCopyVerbSync}) // Calculate verb early to create the destination object early
// Scale up from service to object
dstObj := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob()})), ResourceDefinitionContainer{}).GetObject(svm, "test", common.EEntityType.File())
dstObj := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob(), common.ELocation.File()})), ResourceDefinitionContainer{}).GetObject(svm, "test", common.EEntityType.File())
// The object must exist already if we're syncing.
if azCopyVerb == AzCopyVerbSync {
dstObj.Create(svm, NewZeroObjectContentContainer(0), ObjectProperties{})
Expand All @@ -27,7 +27,7 @@ func (s *BasicFunctionalitySuite) Scenario_SingleFile(svm *ScenarioVariationMana

body := NewRandomObjectContentContainer(svm, SizeFromString("10K"))
// Scale up from service to object
srcObj := CreateResource[ObjectResourceManager](svm, GetRootResource(svm, ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob()})), ResourceDefinitionObject{
srcObj := CreateResource[ObjectResourceManager](svm, GetRootResource(svm, ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob(), common.ELocation.File()})), ResourceDefinitionObject{
ObjectName: pointerTo("test"),
Body: body,
})
Expand Down
41 changes: 10 additions & 31 deletions ste/xferStatsPolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,14 @@ type PipelineNetworkStats struct {

func newPipelineNetworkStats(tunerInterface ConcurrencyTuner) *PipelineNetworkStats {
s := &PipelineNetworkStats{tunerInterface: tunerInterface}
tunerWillCallUs := tunerInterface.RequestCallbackWhenStable(s.start) // we want to start gather stats after the tuner has reached a stable value. No point in gathering them earlier
if !tunerWillCallUs {
// assume tuner is inactive, and start ourselves now
s.start()
}
return s
}

// start starts the gathering of stats
func (s *PipelineNetworkStats) start() {
atomic.StoreInt64(&s.atomicStartSeconds, time.Now().Unix())
return s
}

func (s *PipelineNetworkStats) getStartSeconds() int64 {
return atomic.LoadInt64(&s.atomicStartSeconds)
}

func (s *PipelineNetworkStats) IsStarted() bool {
return s.getStartSeconds() > 0
}

func (s *PipelineNetworkStats) recordRetry(responseBody string) {
if strings.Contains(responseBody, "gress is over the account limit") { // maybe Ingress or Egress
atomic.AddInt64(&s.atomic503CountThroughput, 1)
Expand All @@ -79,9 +66,6 @@ func (s *PipelineNetworkStats) recordRetry(responseBody string) {

func (s *PipelineNetworkStats) OperationsPerSecond() int {
s.nocopy.Check()
if !s.IsStarted() {
return 0
}
elapsed := time.Since(time.Unix(s.getStartSeconds(), 0)).Seconds()
if elapsed > 0 {
return int(float64(atomic.LoadInt64(&s.atomicOperationCount)) / elapsed)
Expand Down Expand Up @@ -192,26 +176,21 @@ func (s statsPolicy) Do(req *policy.Request) (*http.Response, error) {
// Grab the notification callback out of the context and, if its there, call it
stats, ok := req.Raw().Context().Value(pipelineNetworkStatsContextKey).(*PipelineNetworkStats)
if ok && stats != nil {
if stats.IsStarted() {
atomic.AddInt64(&stats.atomicOperationCount, 1)
atomic.AddInt64(&stats.atomicE2ETotalMilliseconds, int64(time.Since(start).Seconds()*1000))

if err != nil && !isContextCancelledError(err) {
// no response from server
atomic.AddInt64(&stats.atomicNetworkErrorCount, 1)
}
atomic.AddInt64(&stats.atomicOperationCount, 1)
atomic.AddInt64(&stats.atomicE2ETotalMilliseconds, int64(time.Since(start).Seconds()*1000))

if err != nil && !isContextCancelledError(err) {
// no response from server
atomic.AddInt64(&stats.atomicNetworkErrorCount, 1)
}

// always look at retries, even if not started, because concurrency tuner needs to know about them
// TODO should we also count status 500? It is mentioned here as timeout:https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets
if response != nil && response.StatusCode == http.StatusServiceUnavailable {
stats.tunerInterface.recordRetry() // always tell the tuner
if stats.IsStarted() { // but only count it here, if we have started
// To find out why the server was busy we need to look at the response
responseBodyText := transparentlyReadBody(response)
stats.recordRetry(responseBodyText)
}

// To find out why the server was busy we need to look at the response
responseBodyText := transparentlyReadBody(response)
stats.recordRetry(responseBodyText)
}
}

Expand Down
Loading