Skip to content

Commit

Permalink
Fix totalSize and totalCounts calculation (minio#2969)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavardhana authored and nitisht committed Nov 25, 2019
1 parent 6eaf0dd commit cf6defe
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 103 deletions.
2 changes: 1 addition & 1 deletion cmd/accounting-reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package cmd

import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cheggaaa/pb"
json "github.com/minio/mc/pkg/colorjson"
"github.com/minio/mc/pkg/probe"
)

Expand Down
41 changes: 19 additions & 22 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,12 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
shouldQueue = true
}
if shouldQueue || mj.isOverwrite {
mirrorURL.TotalCount = mj.TotalObjects
mirrorURL.TotalSize = mj.TotalBytes
// adjust total, because we want to show progress of the item still queued to be copied.
mj.status.SetTotal(mj.status.Total() + sourceContent.Size).Update()
// adjust total, because we want to show progress of
// the item still queued to be copied.
mj.status.SetTotal(mj.status.Get() + sourceContent.Size).Update()
mj.status.AddCounts(1)
mirrorURL.TotalSize = mj.status.Get()
mirrorURL.TotalCount = mj.status.GetCounts()
mj.statusCh <- mj.doMirror(ctx, cancelMirror, mirrorURL)
}
continue
Expand All @@ -465,10 +467,12 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
}
if shouldQueue || mj.isOverwrite {
mirrorURL.SourceContent.Size = event.Size
mirrorURL.TotalCount = mj.TotalObjects
mirrorURL.TotalSize = mj.TotalBytes
// adjust total, because we want to show progress of the itemj stiil queued to be copied.
mj.status.SetTotal(mj.status.Total() + event.Size).Update()
// adjust total, because we want to show progress
// of the itemj stiil queued to be copied.
mj.status.SetTotal(mj.status.Get() + mirrorURL.SourceContent.Size).Update()
mj.status.AddCounts(1)
mirrorURL.TotalSize = mj.status.Get()
mirrorURL.TotalCount = mj.status.GetCounts()
mj.statusCh <- mj.doMirror(ctx, cancelMirror, mirrorURL)
}
} else if event.Type == EventRemove {
Expand All @@ -479,8 +483,8 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
TargetContent: &clientContent{URL: *targetURL},
encKeyDB: mj.encKeyDB,
}
mirrorURL.TotalCount = mj.TotalObjects
mirrorURL.TotalSize = mj.TotalBytes
mirrorURL.TotalCount = mj.status.GetCounts()
mirrorURL.TotalSize = mj.status.Get()
if mirrorURL.TargetContent != nil && mj.isRemove {
mj.statusCh <- mj.doRemove(mirrorURL)
}
Expand All @@ -506,9 +510,6 @@ func (mj *mirrorJob) watchURL(sourceClient Client) *probe.Error {

// Fetch urls that need to be mirrored
func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc) {
var totalBytes int64
var totalObjects int64

stopParallel := func() {
close(mj.queueCh)
mj.parallel.wait()
Expand Down Expand Up @@ -537,19 +538,15 @@ func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.Cance
if mj.newerThan != "" && isNewer(sURLs.SourceContent.Time, mj.newerThan) {
continue
}
// copy
totalBytes += sURLs.SourceContent.Size
}

totalObjects++
mj.TotalBytes = totalBytes
mj.TotalObjects = totalObjects
mj.status.SetTotal(totalBytes)
mj.status.AddCounts(1)
mj.status.Add(sURLs.SourceContent.Size)

// Save total count.
sURLs.TotalCount = mj.TotalObjects
sURLs.TotalCount = mj.status.GetCounts()
// Save totalSize.
sURLs.TotalSize = mj.TotalBytes
sURLs.TotalSize = mj.status.Get()

if sURLs.SourceContent != nil {
mj.queueCh <- func() URLs {
Expand Down Expand Up @@ -629,7 +626,7 @@ func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch,
if globalQuiet {
status = NewQuietStatus(mj.parallel)
} else if globalJSON {
status = NewDummyStatus(mj.parallel)
status = NewQuietStatus(mj.parallel)
}
mj.status = status

Expand Down
135 changes: 55 additions & 80 deletions cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd

import (
"io"
"sync/atomic"

"github.com/minio/mc/pkg/console"
"github.com/minio/mc/pkg/probe"
Expand All @@ -26,6 +27,10 @@ import (
// Status implements a interface that can be used in quit mode or with progressbar.
type Status interface {
Println(data ...interface{})
AddCounts(int64)
SetCounts(int64)
GetCounts() int64

Add(int64) Status
Get() int64
Start()
Expand All @@ -44,86 +49,19 @@ type Status interface {
fatalIf(err *probe.Error, msg string)
}

// NewDummyStatus returns a dummy status object
func NewDummyStatus(hook io.Reader) Status {
return &DummyStatus{hook}
}

// DummyStatus will not show anything.
type DummyStatus struct {
hook io.Reader
}

// Read implements the io.Reader interface
func (ds *DummyStatus) Read(p []byte) (n int, err error) {
ds.hook.Read(p)
return len(p), nil
}

// Get implements Progress interface
func (ds *DummyStatus) Get() int64 {
return 0
}

// SetTotal sets the total of the progressbar, ignored for quietstatus
func (ds *DummyStatus) SetTotal(v int64) Status {
return ds
}

// SetCaption sets the caption of the progressbar, ignored for quietstatus
func (ds *DummyStatus) SetCaption(s string) {}

// Total returns the total number of bytes
func (ds *DummyStatus) Total() int64 {
return 0
}

// Add bytes to current number of bytes
func (ds *DummyStatus) Add(v int64) Status {
return ds
}

// Println prints line, ignored for quietstatus
func (ds *DummyStatus) Println(data ...interface{}) {}

// PrintMsg prints message
func (ds *DummyStatus) PrintMsg(msg message) {
if !globalJSON {
console.Println(msg.String())
} else {
console.Println(msg.JSON())
}
}

// Start is ignored for quietstatus
func (ds *DummyStatus) Start() {}

// Finish displays the accounting summary
func (ds *DummyStatus) Finish() {}

// Update is ignored for quietstatus
func (ds *DummyStatus) Update() {}

func (ds *DummyStatus) errorIf(err *probe.Error, msg string) {
errorIf(err, msg)
}

func (ds *DummyStatus) fatalIf(err *probe.Error, msg string) {
fatalIf(err, msg)
}

// NewQuietStatus returns a quiet status object
func NewQuietStatus(hook io.Reader) Status {
return &QuietStatus{
newAccounter(0),
hook,
accounter: newAccounter(0),
hook: hook,
}
}

// QuietStatus will only show the progress and summary
type QuietStatus struct {
*accounter
hook io.Reader
hook io.Reader
counts int64
}

// Read implements the io.Reader interface
Expand All @@ -132,6 +70,21 @@ func (qs *QuietStatus) Read(p []byte) (n int, err error) {
return qs.accounter.Read(p)
}

// SetCounts sets number of files uploaded
func (qs *QuietStatus) SetCounts(v int64) {
atomic.StoreInt64(&qs.counts, v)
}

// GetCounts returns number of files uploaded
func (qs *QuietStatus) GetCounts() int64 {
return atomic.LoadInt64(&qs.counts)
}

// AddCounts adds 'v' number of files uploaded.
func (qs *QuietStatus) AddCounts(v int64) {
atomic.AddInt64(&qs.counts, v)
}

// SetTotal sets the total of the progressbar, ignored for quietstatus
func (qs *QuietStatus) SetTotal(v int64) Status {
qs.accounter.Total = v
Expand All @@ -142,6 +95,11 @@ func (qs *QuietStatus) SetTotal(v int64) Status {
func (qs *QuietStatus) SetCaption(s string) {
}

// Get returns the current number of bytes
func (qs *QuietStatus) Get() int64 {
return qs.accounter.Get()
}

// Total returns the total number of bytes
func (qs *QuietStatus) Total() int64 {
return qs.accounter.Total
Expand All @@ -159,11 +117,7 @@ func (qs *QuietStatus) Println(data ...interface{}) {

// PrintMsg prints message
func (qs *QuietStatus) PrintMsg(msg message) {
if !globalJSON {
console.Println(msg.String())
} else {
console.Println(msg.JSON())
}
printMsg(msg)
}

// Start is ignored for quietstatus
Expand All @@ -172,7 +126,7 @@ func (qs *QuietStatus) Start() {

// Finish displays the accounting summary
func (qs *QuietStatus) Finish() {
console.Println(console.Colorize("Mirror", qs.accounter.Stat().String()))
printMsg(qs.accounter.Stat())
}

// Update is ignored for quietstatus
Expand All @@ -190,15 +144,16 @@ func (qs *QuietStatus) fatalIf(err *probe.Error, msg string) {
// NewProgressStatus returns a progress status object
func NewProgressStatus(hook io.Reader) Status {
return &ProgressStatus{
newProgressBar(0),
hook,
progressBar: newProgressBar(0),
hook: hook,
}
}

// ProgressStatus shows a progressbar
type ProgressStatus struct {
*progressBar
hook io.Reader
hook io.Reader
counts int64
}

// Read implements the io.Reader interface
Expand All @@ -212,6 +167,26 @@ func (ps *ProgressStatus) SetCaption(s string) {
ps.progressBar.SetCaption(s)
}

// SetCounts sets number of files uploaded
func (ps *ProgressStatus) SetCounts(v int64) {
atomic.StoreInt64(&ps.counts, v)
}

// GetCounts returns number of files uploaded
func (ps *ProgressStatus) GetCounts() int64 {
return atomic.LoadInt64(&ps.counts)
}

// AddCounts adds 'v' number of files uploaded.
func (ps *ProgressStatus) AddCounts(v int64) {
atomic.AddInt64(&ps.counts, v)
}

// Get returns the current number of bytes
func (ps *ProgressStatus) Get() int64 {
return ps.progressBar.Get()
}

// Total returns the total number of bytes
func (ps *ProgressStatus) Total() int64 {
return ps.progressBar.Total
Expand Down

0 comments on commit cf6defe

Please sign in to comment.