From 39a97a335112ce98eb078e3c9a67dbc1995b167e Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Sun, 27 Oct 2024 12:15:15 +0800 Subject: [PATCH 1/2] change --- br/pkg/storage/azblob.go | 64 +++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index 2ada42dffa070..d8ee26fd5713b 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -593,6 +593,8 @@ type azblobObjectReader struct { ctx context.Context cpkInfo *blob.CPKInfo + // opened lazily + reader io.ReadCloser } // Read implement the io.Reader interface. @@ -604,30 +606,27 @@ func (r *azblobObjectReader) Read(p []byte) (n int, err error) { if maxCnt == 0 { return 0, io.EOF } - resp, err := r.blobClient.DownloadStream(r.ctx, &blob.DownloadStreamOptions{ - Range: blob.HTTPRange{ - Offset: r.pos, - Count: maxCnt, - }, - - CPKInfo: r.cpkInfo, - }) - if err != nil { - return 0, errors.Annotatef(err, "Failed to read data from azure blob, data info: pos='%d', count='%d'", r.pos, maxCnt) + if r.reader == nil { + if err2 := r.reopenReader(); err2 != nil { + return 0, err2 + } } - body := resp.NewRetryReader(r.ctx, &blob.RetryReaderOptions{ - MaxRetries: azblobRetryTimes, - }) - n, err = body.Read(p) + buf := p[:maxCnt] + n, err = r.reader.Read(buf) if err != nil && err != io.EOF { return 0, errors.Annotatef(err, "Failed to read data from azure blob response, data info: pos='%d', count='%d'", r.pos, maxCnt) } r.pos += int64(n) - return n, body.Close() + return n, nil } // Close implement the io.Closer interface. -func (*azblobObjectReader) Close() error { +func (r *azblobObjectReader) Close() error { + if r.reader != nil { + err := errors.Trace(r.reader.Close()) + r.reader = nil + return err + } return nil } @@ -653,13 +652,44 @@ func (r *azblobObjectReader) Seek(offset int64, whence int) (int64, error) { return 0, errors.Annotatef(berrors.ErrStorageUnknown, "Seek: invalid whence '%d'", whence) } - if realOffset < 0 { + if realOffset < 0 || realOffset > r.totalSize { return 0, errors.Annotatef(berrors.ErrInvalidArgument, "Seek: offset is %d, but length of content is only %d", realOffset, r.totalSize) } + if realOffset == r.pos { + return r.pos, nil + } r.pos = realOffset + // azblob reader can only read forward, so we need to reopen the reader + if err := r.reopenReader(); err != nil { + return 0, err + } return r.pos, nil } +func (r *azblobObjectReader) reopenReader() error { + if r.reader != nil { + err := errors.Trace(r.reader.Close()) + if err != nil { + log.Warn("failed to close azblob reader", zap.Error(err)) + } + } + + resp, err := r.blobClient.DownloadStream(r.ctx, &blob.DownloadStreamOptions{ + Range: blob.HTTPRange{ + Offset: r.pos, + }, + CPKInfo: r.cpkInfo, + }) + if err != nil { + return errors.Annotatef(err, "Failed to read data from azure blob, data info: pos='%d'", r.pos) + } + body := resp.NewRetryReader(r.ctx, &blob.RetryReaderOptions{ + MaxRetries: azblobRetryTimes, + }) + r.reader = body + return nil +} + func (r *azblobObjectReader) GetFileSize() (int64, error) { return r.totalSize, nil } From c63efc809cb6362b1c4bd488e6b2fe414cc5b67f Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Sun, 27 Oct 2024 12:38:57 +0800 Subject: [PATCH 2/2] change --- br/pkg/storage/azblob_test.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/br/pkg/storage/azblob_test.go b/br/pkg/storage/azblob_test.go index 75313dd27e9d7..99f5dd41574aa 100644 --- a/br/pkg/storage/azblob_test.go +++ b/br/pkg/storage/azblob_test.go @@ -62,7 +62,7 @@ func TestAzblob(t *testing.T) { builder := &sharedKeyAzuriteClientBuilder{} skip, err := createContainer(ctx, builder, options.Bucket) if skip || err != nil { - t.Log("azurite is not running, skip test") + t.Skip("azurite is not running, skip test") return } require.NoError(t, err) @@ -123,23 +123,34 @@ func TestAzblob(t *testing.T) { efr, err := azblobStorage.Open(ctx, "key2", nil) require.NoError(t, err) + size, err := efr.GetFileSize() + require.NoError(t, err) + require.EqualValues(t, 33, size) + + realReader := efr.(*azblobObjectReader) + require.Nil(t, realReader.reader) p := make([]byte, 10) n, err := efr.Read(p) require.NoError(t, err) require.Equal(t, 10, n) require.Equal(t, "data222233", string(p)) + require.NotNil(t, realReader.reader) + oldInnerReader := realReader.reader p = make([]byte, 40) n, err = efr.Read(p) require.NoError(t, err) require.Equal(t, 23, n) require.Equal(t, "46757222222222289722222", string(p[:23])) + require.Same(t, oldInnerReader, realReader.reader) p = make([]byte, 5) offs, err := efr.Seek(3, io.SeekStart) require.NoError(t, err) require.Equal(t, int64(3), offs) + // reader reopened + require.NotSame(t, oldInnerReader, realReader.reader) n, err = efr.Read(p) require.NoError(t, err)