Skip to content

Commit

Permalink
COS: Fix tools replicate (#5166)
Browse files Browse the repository at this point in the history
* objstore: fix TryToGetSize err when the reader wrappered by TimingReadCloser

Signed-off-by: Jimmie Han <hanjinming@outlook.com>

* COS: Add size info into reader for pass it to Upload function.

Signed-off-by: Jimmie Han <hanjinming@outlook.com>

* Fix typo

Co-authored-by: Matej Gera <38492574+matej-g@users.noreply.github.com>
Signed-off-by: Jimmie Han <hanjinming@outlook.com>

Co-authored-by: Matej Gera <38492574+matej-g@users.noreply.github.com>
  • Loading branch information
hanjm and matej-g authored Feb 25, 2022
1 parent cb2dc7f commit 7528375
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,19 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
runutil.ExhaustCloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close")
return nil, err
}
// Add size info into reader to pass it to Upload function.
r := objectSizerReadCloser{ReadCloser: resp.Body, size: resp.ContentLength}
return r, nil
}

type objectSizerReadCloser struct {
io.ReadCloser
size int64
}

return resp.Body, nil
// ObjectSize implement objstore.ObjectSizer.
func (o objectSizerReadCloser) ObjectSize() (int64, error) {
return o.size, nil
}

// Get returns a reader for the given object name.
Expand Down
9 changes: 9 additions & 0 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ func (b *metricBucket) Name() string {

type timingReadCloser struct {
io.ReadCloser
objSize int64
objSizeErr error

alreadyGotErr bool

Expand All @@ -506,8 +508,11 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
objSize, objSizeErr := TryToGetSize(rc)
return &timingReadCloser{
ReadCloser: rc,
objSize: objSize,
objSizeErr: objSizeErr,
start: time.Now(),
op: op,
duration: dur,
Expand All @@ -516,6 +521,10 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV
}
}

func (t *timingReadCloser) ObjectSize() (int64, error) {
return t.objSize, t.objSizeErr
}

func (rc *timingReadCloser) Close() error {
err := rc.ReadCloser.Close()
if !rc.alreadyGotErr && err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/objstore/objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,30 @@ func TestTracingReader(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, int64(11), size)
}

func TestTimingTracingReader(t *testing.T) {
m := BucketWithMetrics("", NewInMemBucket(), nil)
r := bytes.NewReader([]byte("hello world"))

tr := NopCloserWithSize(r)
tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool {
return false
})
tr = newTracingReadCloser(tr, nil)

size, err := TryToGetSize(tr)

testutil.Ok(t, err)
testutil.Equals(t, int64(11), size)

smallBuf := make([]byte, 4)
n, err := io.ReadFull(tr, smallBuf)
testutil.Ok(t, err)
testutil.Equals(t, 4, n)

// Verify that size is still the same, after reading 4 bytes.
size, err = TryToGetSize(tr)

testutil.Ok(t, err)
testutil.Equals(t, int64(11), size)
}

0 comments on commit 7528375

Please sign in to comment.