From 3cfdf2887cc2148a26ab07754c5b3d73b6abc689 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 6 Jan 2020 21:32:53 +0000 Subject: [PATCH] storage/cloud: split out implementations into own files This is change is only moving code, and only what files that code is in -- no behavior or even symbols are changed. This is just splitting up the big 'external_storage.go' that previously contained both the common interface and all the implementaitons of it, to only contain the interface and common helpers, with each implementaiton now in its own file. Tests are similarly split. Release note: none. --- pkg/storage/cloud/azure_storage.go | 153 +++ pkg/storage/cloud/azure_storage_test.go | 51 + pkg/storage/cloud/external_storage.go | 1162 +------------------ pkg/storage/cloud/external_storage_test.go | 440 ------- pkg/storage/cloud/gcs_storage.go | 210 ++++ pkg/storage/cloud/http_storage.go | 378 ++++++ pkg/storage/cloud/http_storage_test.go | 285 +++++ pkg/storage/cloud/nodelocal_storage.go | 122 ++ pkg/storage/cloud/nodelocal_storage_test.go | 67 ++ pkg/storage/cloud/s3_storage.go | 224 ++++ pkg/storage/cloud/s3_storage_test.go | 128 ++ pkg/storage/cloud/workload_storage.go | 141 +++ pkg/testutils/lint/lint_test.go | 2 +- 13 files changed, 1801 insertions(+), 1562 deletions(-) create mode 100644 pkg/storage/cloud/azure_storage.go create mode 100644 pkg/storage/cloud/azure_storage_test.go create mode 100644 pkg/storage/cloud/gcs_storage.go create mode 100644 pkg/storage/cloud/http_storage.go create mode 100644 pkg/storage/cloud/http_storage_test.go create mode 100644 pkg/storage/cloud/nodelocal_storage.go create mode 100644 pkg/storage/cloud/nodelocal_storage_test.go create mode 100644 pkg/storage/cloud/s3_storage.go create mode 100644 pkg/storage/cloud/s3_storage_test.go create mode 100644 pkg/storage/cloud/workload_storage.go diff --git a/pkg/storage/cloud/azure_storage.go b/pkg/storage/cloud/azure_storage.go new file mode 100644 index 000000000000..5c6b5fe6f158 --- /dev/null +++ b/pkg/storage/cloud/azure_storage.go @@ -0,0 +1,153 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "fmt" + "io" + "net/url" + "path/filepath" + "strings" + + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/errors" +) + +type azureStorage struct { + conf *roachpb.ExternalStorage_Azure + container azblob.ContainerURL + prefix string + settings *cluster.Settings +} + +var _ ExternalStorage = &azureStorage{} + +func makeAzureStorage( + conf *roachpb.ExternalStorage_Azure, settings *cluster.Settings, +) (ExternalStorage, error) { + if conf == nil { + return nil, errors.Errorf("azure upload requested but info missing") + } + credential, err := azblob.NewSharedKeyCredential(conf.AccountName, conf.AccountKey) + if err != nil { + return nil, errors.Wrap(err, "azure credential") + } + p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", conf.AccountName)) + if err != nil { + return nil, errors.Wrap(err, "azure: account name is not valid") + } + serviceURL := azblob.NewServiceURL(*u, p) + return &azureStorage{ + conf: conf, + container: serviceURL.NewContainerURL(conf.Container), + prefix: conf.Prefix, + settings: settings, + }, nil +} + +func (s *azureStorage) getBlob(basename string) azblob.BlockBlobURL { + name := filepath.Join(s.prefix, basename) + return s.container.NewBlockBlobURL(name) +} + +func (s *azureStorage) Conf() roachpb.ExternalStorage { + return roachpb.ExternalStorage{ + Provider: roachpb.ExternalStorageProvider_Azure, + AzureConfig: s.conf, + } +} + +func (s *azureStorage) WriteFile( + ctx context.Context, basename string, content io.ReadSeeker, +) error { + err := contextutil.RunWithTimeout(ctx, "write azure file", timeoutSetting.Get(&s.settings.SV), + func(ctx context.Context) error { + blob := s.getBlob(basename) + _, err := blob.Upload( + ctx, content, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, + ) + return err + }) + return errors.Wrapf(err, "write file: %s", basename) +} + +func (s *azureStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { + // https://github.com/cockroachdb/cockroach/issues/23859 + blob := s.getBlob(basename) + get, err := blob.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false) + if err != nil { + return nil, errors.Wrap(err, "failed to create azure reader") + } + reader := get.Body(azblob.RetryReaderOptions{MaxRetryRequests: 3}) + return reader, nil +} + +func (s *azureStorage) ListFiles(ctx context.Context) ([]string, error) { + var fileList []string + response, err := s.container.ListBlobsFlatSegment(ctx, + azblob.Marker{}, + azblob.ListBlobsSegmentOptions{Prefix: getBucketBeforeWildcard(s.prefix)}, + ) + if err != nil { + return nil, errors.Wrap(err, "unable to list files for specified blob") + } + + for _, blob := range response.Segment.BlobItems { + matches, err := filepath.Match(s.prefix, blob.Name) + if err != nil { + continue + } + if matches { + azureURL := url.URL{ + Scheme: "azure", + Host: strings.TrimPrefix(s.container.URL().Path, "/"), + Path: blob.Name, + } + fileList = append(fileList, azureURL.String()) + } + } + + return fileList, nil +} + +func (s *azureStorage) Delete(ctx context.Context, basename string) error { + err := contextutil.RunWithTimeout(ctx, "delete azure file", timeoutSetting.Get(&s.settings.SV), + func(ctx context.Context) error { + blob := s.getBlob(basename) + _, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) + return err + }) + return errors.Wrap(err, "delete file") +} + +func (s *azureStorage) Size(ctx context.Context, basename string) (int64, error) { + var props *azblob.BlobGetPropertiesResponse + err := contextutil.RunWithTimeout(ctx, "size azure file", timeoutSetting.Get(&s.settings.SV), + func(ctx context.Context) error { + blob := s.getBlob(basename) + var err error + props, err = blob.GetProperties(ctx, azblob.BlobAccessConditions{}) + return err + }) + if err != nil { + return 0, errors.Wrap(err, "get file properties") + } + return props.ContentLength(), nil +} + +func (s *azureStorage) Close() error { + return nil +} diff --git a/pkg/storage/cloud/azure_storage_test.go b/pkg/storage/cloud/azure_storage_test.go new file mode 100644 index 000000000000..0a5df037f09c --- /dev/null +++ b/pkg/storage/cloud/azure_storage_test.go @@ -0,0 +1,51 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "fmt" + "net/url" + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestPutAzure(t *testing.T) { + defer leaktest.AfterTest(t)() + + accountName := os.Getenv("AZURE_ACCOUNT_NAME") + accountKey := os.Getenv("AZURE_ACCOUNT_KEY") + if accountName == "" || accountKey == "" { + t.Skip("AZURE_ACCOUNT_NAME and AZURE_ACCOUNT_KEY env vars must be set") + } + bucket := os.Getenv("AZURE_CONTAINER") + if bucket == "" { + t.Skip("AZURE_CONTAINER env var must be set") + } + + testExportStore(t, + fmt.Sprintf("azure://%s/%s?%s=%s&%s=%s", + bucket, "backup-test", + AzureAccountNameParam, url.QueryEscape(accountName), + AzureAccountKeyParam, url.QueryEscape(accountKey), + ), + false, + ) + testListFiles( + t, + fmt.Sprintf("azure://%s/%s?%s=%s&%s=%s", + bucket, "listing-test", + AzureAccountNameParam, url.QueryEscape(accountName), + AzureAccountKeyParam, url.QueryEscape(accountKey), + ), + ) +} diff --git a/pkg/storage/cloud/external_storage.go b/pkg/storage/cloud/external_storage.go index 6994cd6f4bb5..67ce9d981ea8 100644 --- a/pkg/storage/cloud/external_storage.go +++ b/pkg/storage/cloud/external_storage.go @@ -12,42 +12,21 @@ package cloud import ( "context" - "crypto/tls" - "crypto/x509" - "encoding/base64" - "fmt" - "hash/fnv" "io" - "io/ioutil" - "net/http" "net/url" - "path" - "path/filepath" "strconv" "strings" "time" - gcs "cloud.google.com/go/storage" - "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/contextutil" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/sysutil" - "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" - "golang.org/x/oauth2/google" - "google.golang.org/api/iterator" - "google.golang.org/api/option" ) const ( @@ -97,6 +76,47 @@ const ( cloudStorageTimeout = cloudstoragePrefix + ".timeout" ) +// ExternalStorageFactory describes a factory function for ExternalStorage. +type ExternalStorageFactory func(ctx context.Context, dest roachpb.ExternalStorage) (ExternalStorage, error) + +// ExternalStorageFromURIFactory describes a factory function for ExternalStorage given a URI. +type ExternalStorageFromURIFactory func(ctx context.Context, uri string) (ExternalStorage, error) + +// ExternalStorage provides functions to read and write files in some storage, +// namely various cloud storage providers, for example to store backups. +// Generally an implementation is instantiated pointing to some base path or +// prefix and then gets and puts files using the various methods to interact +// with individual files contained within that path or prefix. +// However, implementations must also allow callers to provide the full path to +// a given file as the "base" path, and then read or write it with the methods +// below by simply passing an empty filename. Implementations that use stdlib's +// `filepath.Join` to concatenate their base path with the provided filename will +// find its semantics well suited to this -- it elides empty components and does +// not append surplus slashes. +type ExternalStorage interface { + io.Closer + + // Conf should return the serializable configuration required to reconstruct + // this ExternalStorage implementation. + Conf() roachpb.ExternalStorage + + // ReadFile should return a Reader for requested name. + ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) + + // WriteFile should write the content to requested name. + WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error + + // ListFiles should treat the ExternalStorage URI as a glob + // pattern, and return a list of files that match the pattern. + ListFiles(ctx context.Context) ([]string, error) + + // Delete removes the named file from the store. + Delete(ctx context.Context, basename string) error + + // Size returns the length of the named file in bytes. + Size(ctx context.Context, basename string) (int64, error) +} + // ExternalStorageConfFromURI generates an ExternalStorage config from a URI string. func ExternalStorageConfFromURI(path string) (roachpb.ExternalStorage, error) { conf := roachpb.ExternalStorage{} @@ -251,47 +271,6 @@ func URINeedsGlobExpansion(uri string) bool { return strings.ContainsAny(parsedURI.Path, "*?[") } -// ExternalStorageFactory describes a factory function for ExternalStorage. -type ExternalStorageFactory func(ctx context.Context, dest roachpb.ExternalStorage) (ExternalStorage, error) - -// ExternalStorageFromURIFactory describes a factory function for ExternalStorage given a URI. -type ExternalStorageFromURIFactory func(ctx context.Context, uri string) (ExternalStorage, error) - -// ExternalStorage provides functions to read and write files in some storage, -// namely various cloud storage providers, for example to store backups. -// Generally an implementation is instantiated pointing to some base path or -// prefix and then gets and puts files using the various methods to interact -// with individual files contained within that path or prefix. -// However, implementations must also allow callers to provide the full path to -// a given file as the "base" path, and then read or write it with the methods -// below by simply passing an empty filename. Implementations that use stdlib's -// `filepath.Join` to concatenate their base path with the provided filename will -// find its semantics well suited to this -- it elides empty components and does -// not append surplus slashes. -type ExternalStorage interface { - io.Closer - - // Conf should return the serializable configuration required to reconstruct - // this ExternalStorage implementation. - Conf() roachpb.ExternalStorage - - // ReadFile should return a Reader for requested name. - ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) - - // WriteFile should write the content to requested name. - WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error - - // ListFiles should treat the ExternalStorage URI as a glob - // pattern, and return a list of files that match the pattern. - ListFiles(ctx context.Context) ([]string, error) - - // Delete removes the named file from the store. - Delete(ctx context.Context, basename string) error - - // Size returns the length of the named file in bytes. - Size(ctx context.Context, basename string) (int64, error) -} - var ( gcsDefault = settings.RegisterPublicStringSetting( cloudstorageGSDefaultKey, @@ -309,456 +288,6 @@ var ( 10*time.Minute) ) -type localFileStorage struct { - cfg roachpb.ExternalStorage_LocalFilePath // contains un-prefixed filepath -- DO NOT use for I/O ops. - base string // relative filepath prefixed with externalIODir, for I/O ops on this node. - blobClient blobs.BlobClient // inter-node file sharing service -} - -var _ ExternalStorage = &localFileStorage{} - -// MakeLocalStorageURI converts a local path (should always be relative) to a -// valid nodelocal URI. -func MakeLocalStorageURI(path string) string { - return fmt.Sprintf("nodelocal:///%s", path) -} - -func makeNodeLocalURIWithNodeID(nodeID roachpb.NodeID, path string) string { - path = strings.TrimPrefix(path, "/") - if nodeID == 0 { - return fmt.Sprintf("nodelocal:///%s", path) - } - return fmt.Sprintf("nodelocal://%d/%s", nodeID, path) -} - -func makeLocalStorage( - ctx context.Context, - cfg roachpb.ExternalStorage_LocalFilePath, - settings *cluster.Settings, - blobClientFactory blobs.BlobClientFactory, -) (ExternalStorage, error) { - if cfg.Path == "" { - return nil, errors.Errorf("Local storage requested but path not provided") - } - client, err := blobClientFactory(ctx, cfg.NodeID) - if err != nil { - return nil, errors.Wrap(err, "failed to create blob client") - } - - // In non-server execution we have no settings and no restriction on local IO. - if settings != nil { - if settings.ExternalIODir == "" { - return nil, errors.Errorf("local file access is disabled") - } - } - return &localFileStorage{base: cfg.Path, cfg: cfg, blobClient: client}, nil -} - -func (l *localFileStorage) Conf() roachpb.ExternalStorage { - return roachpb.ExternalStorage{ - Provider: roachpb.ExternalStorageProvider_LocalFile, - LocalFile: l.cfg, - } -} - -func joinRelativePath(filePath string, file string) string { - // Joining "." to make this a relative path. - // This ensures path.Clean does not simplify in unexpected ways. - return path.Join(".", filePath, file) -} - -func (l *localFileStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - return l.blobClient.WriteFile(ctx, joinRelativePath(l.base, basename), content) -} - -func (l *localFileStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { - return l.blobClient.ReadFile(ctx, joinRelativePath(l.base, basename)) -} - -func (l *localFileStorage) ListFiles(ctx context.Context) ([]string, error) { - var fileList []string - matches, err := l.blobClient.List(ctx, l.base) - if err != nil { - return nil, errors.Wrap(err, "unable to match pattern provided") - } - - for _, fileName := range matches { - fileList = append(fileList, makeNodeLocalURIWithNodeID(l.cfg.NodeID, fileName)) - } - - return fileList, nil -} - -func (l *localFileStorage) Delete(ctx context.Context, basename string) error { - return l.blobClient.Delete(ctx, joinRelativePath(l.base, basename)) -} - -func (l *localFileStorage) Size(ctx context.Context, basename string) (int64, error) { - stat, err := l.blobClient.Stat(ctx, joinRelativePath(l.base, basename)) - if err != nil { - return 0, err - } - return stat.Filesize, nil -} - -func (*localFileStorage) Close() error { - return nil -} - -type httpStorage struct { - base *url.URL - client *http.Client - hosts []string - settings *cluster.Settings -} - -var _ ExternalStorage = &httpStorage{} - -type retryableHTTPError struct { - cause error -} - -func (e *retryableHTTPError) Error() string { - return fmt.Sprintf("retryable http error: %s", e.cause) -} - -// package visible for test. -var httpRetryOptions = retry.Options{ - InitialBackoff: 100 * time.Millisecond, - MaxBackoff: 2 * time.Second, - MaxRetries: 32, - Multiplier: 4, -} - -func makeHTTPClient(settings *cluster.Settings) (*http.Client, error) { - var tlsConf *tls.Config - if pem := httpCustomCA.Get(&settings.SV); pem != "" { - roots, err := x509.SystemCertPool() - if err != nil { - return nil, errors.Wrap(err, "could not load system root CA pool") - } - if !roots.AppendCertsFromPEM([]byte(pem)) { - return nil, errors.Errorf("failed to parse root CA certificate from %q", pem) - } - tlsConf = &tls.Config{RootCAs: roots} - } - // Copy the defaults from http.DefaultTransport. We cannot just copy the - // entire struct because it has a sync Mutex. This has the unfortunate problem - // that if Go adds fields to DefaultTransport they won't be copied here, - // but this is ok for now. - t := http.DefaultTransport.(*http.Transport) - return &http.Client{Transport: &http.Transport{ - Proxy: t.Proxy, - DialContext: t.DialContext, - MaxIdleConns: t.MaxIdleConns, - IdleConnTimeout: t.IdleConnTimeout, - TLSHandshakeTimeout: t.TLSHandshakeTimeout, - ExpectContinueTimeout: t.ExpectContinueTimeout, - - // Add our custom CA. - TLSClientConfig: tlsConf, - }}, nil -} - -func makeHTTPStorage(base string, settings *cluster.Settings) (ExternalStorage, error) { - if base == "" { - return nil, errors.Errorf("HTTP storage requested but base path not provided") - } - - client, err := makeHTTPClient(settings) - if err != nil { - return nil, err - } - uri, err := url.Parse(base) - if err != nil { - return nil, err - } - return &httpStorage{ - base: uri, - client: client, - hosts: strings.Split(uri.Host, ","), - settings: settings, - }, nil -} - -func (h *httpStorage) Conf() roachpb.ExternalStorage { - return roachpb.ExternalStorage{ - Provider: roachpb.ExternalStorageProvider_Http, - HttpPath: roachpb.ExternalStorage_Http{ - BaseUri: h.base.String(), - }, - } -} - -type resumingHTTPReader struct { - body io.ReadCloser - canResume bool // Can we resume if download aborts prematurely? - pos int64 // How much data was received so far. - ctx context.Context - url string - client *httpStorage -} - -var _ io.ReadCloser = &resumingHTTPReader{} - -func newResumingHTTPReader( - ctx context.Context, client *httpStorage, url string, -) (*resumingHTTPReader, error) { - r := &resumingHTTPReader{ - ctx: ctx, - client: client, - url: url, - } - - resp, err := r.sendRequest(nil) - if err != nil { - return nil, err - } - - r.canResume = resp.Header.Get("Accept-Ranges") == "bytes" - r.body = resp.Body - return r, nil -} - -func (r *resumingHTTPReader) Close() error { - if r.body != nil { - return r.body.Close() - } - return nil -} - -// checkHTTPContentRangeHeader parses Content-Range header and -// ensures that range start offset is the same as 'pos' -// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range -func checkHTTPContentRangeHeader(h string, pos int64) error { - if len(h) == 0 { - return errors.New("http server does not honor download resume") - } - - h = strings.TrimPrefix(h, "bytes ") - dash := strings.IndexByte(h, '-') - if dash <= 0 { - return errors.Errorf("malformed Content-Range header: %s", h) - } - - resume, err := strconv.ParseInt(h[:dash], 10, 64) - if err != nil { - return errors.Errorf("malformed start offset in Content-Range header: %s", h) - } - - if resume != pos { - return errors.Errorf( - "expected resume position %d, found %d instead in Content-Range header: %s", - pos, resume, h) - } - return nil -} - -func (r *resumingHTTPReader) sendRequest( - reqHeaders map[string]string, -) (resp *http.Response, err error) { - // Initialize err to the context.Canceled: if our context is canceled, we will - // never enter the loop below; in this case we want to return "nil, canceled" - err = context.Canceled - - for attempt, retries := 0, - retry.StartWithCtx(r.ctx, httpRetryOptions); retries.Next(); attempt++ { - resp, err = r.client.req(r.ctx, "GET", r.url, nil, reqHeaders) - - if err == nil { - return - } - - log.Errorf(r.ctx, "HTTP:Req error: err=%s (attempt %d)", err, attempt) - - if _, ok := err.(*retryableHTTPError); !ok { - return - } - } - - return -} - -// requestNextRanges issues additional http request -// to continue downloading next range of bytes. -func (r *resumingHTTPReader) requestNextRange() (err error) { - if err := r.body.Close(); err != nil { - return err - } - - r.body = nil - var resp *http.Response - resp, err = r.sendRequest(map[string]string{"Range": fmt.Sprintf("bytes=%d-", r.pos)}) - - if err == nil { - err = checkHTTPContentRangeHeader(resp.Header.Get("Content-Range"), r.pos) - } - - if err == nil { - r.body = resp.Body - } - return -} - -// isResumableHTTPError returns true if we can -// resume download after receiving an error 'err'. -// We can attempt to resume download if the error is ErrUnexpectedEOF. -// In particular, we should not worry about a case when error is io.EOF. -// The reason for this is two-fold: -// 1. The underlying http library converts io.EOF to io.ErrUnexpectedEOF -// if the number of bytes transferred is less than the number of -// bytes advertised in the Content-Length header. So if we see -// io.ErrUnexpectedEOF we can simply request the next range. -// 2. If the server did *not* advertise Content-Length, then -// there is really nothing we can do: http standard says that -// the stream ends when the server terminates connection. -// In addition, we treat connection reset by peer errors (which can -// happen if we didn't read from the connection too long due to e.g. load), -// the same was as unexpected eof errors. -func isResumableHTTPError(err error) bool { - return errors.Is(err, io.ErrUnexpectedEOF) || sysutil.IsErrConnectionReset(err) -} - -const maxNoProgressReads = 3 - -// Read implements io.Reader interface to read the data from the underlying -// http stream, issuing additional requests in case download is interrupted. -func (r *resumingHTTPReader) Read(p []byte) (n int, err error) { - for retries := 0; n == 0 && err == nil; retries++ { - n, err = r.body.Read(p) - r.pos += int64(n) - - if err != nil && !errors.IsAny(err, io.EOF, io.ErrUnexpectedEOF) { - log.Errorf(r.ctx, "HTTP:Read err: %s", err) - } - - // Resume download if the http server supports this. - if r.canResume && isResumableHTTPError(err) { - log.Errorf(r.ctx, "HTTP:Retry: error %s", err) - if retries > maxNoProgressReads { - err = errors.Wrap(err, "multiple Read calls return no data") - return - } - err = r.requestNextRange() - } - } - - return -} - -func (h *httpStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { - // https://github.com/cockroachdb/cockroach/issues/23859 - return newResumingHTTPReader(ctx, h, basename) -} - -func (h *httpStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - return contextutil.RunWithTimeout(ctx, fmt.Sprintf("PUT %s", basename), - timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error { - _, err := h.reqNoBody(ctx, "PUT", basename, content) - return err - }) -} - -func (h *httpStorage) ListFiles(_ context.Context) ([]string, error) { - return nil, errors.New(`http storage does not support listing files`) -} - -func (h *httpStorage) Delete(ctx context.Context, basename string) error { - return contextutil.RunWithTimeout(ctx, fmt.Sprintf("DELETE %s", basename), - timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error { - _, err := h.reqNoBody(ctx, "DELETE", basename, nil) - return err - }) -} - -func (h *httpStorage) Size(ctx context.Context, basename string) (int64, error) { - var resp *http.Response - if err := contextutil.RunWithTimeout(ctx, fmt.Sprintf("HEAD %s", basename), - timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error { - var err error - resp, err = h.reqNoBody(ctx, "HEAD", basename, nil) - return err - }); err != nil { - return 0, err - } - if resp.ContentLength < 0 { - return 0, errors.Errorf("bad ContentLength: %d", resp.ContentLength) - } - return resp.ContentLength, nil -} - -func (h *httpStorage) Close() error { - return nil -} - -// reqNoBody is like req but it closes the response body. -func (h *httpStorage) reqNoBody( - ctx context.Context, method, file string, body io.Reader, -) (*http.Response, error) { - resp, err := h.req(ctx, method, file, body, nil) - if resp != nil { - resp.Body.Close() - } - return resp, err -} - -func (h *httpStorage) req( - ctx context.Context, method, file string, body io.Reader, headers map[string]string, -) (*http.Response, error) { - dest := *h.base - if hosts := len(h.hosts); hosts > 1 { - if file == "" { - return nil, errors.New("cannot use a multi-host HTTP basepath for single file") - } - hash := fnv.New32a() - if _, err := hash.Write([]byte(file)); err != nil { - panic(errors.Wrap(err, `"It never returns an error." -- https://golang.org/pkg/hash`)) - } - dest.Host = h.hosts[int(hash.Sum32())%hosts] - } - dest.Path = filepath.Join(dest.Path, file) - url := dest.String() - req, err := http.NewRequest(method, url, body) - - if err != nil { - return nil, errors.Wrapf(err, "error constructing request %s %q", method, url) - } - req = req.WithContext(ctx) - - for key, val := range headers { - req.Header.Add(key, val) - } - - resp, err := h.client.Do(req) - if err != nil { - // We failed to establish connection to the server (we don't even have - // a response object/server response code). Those errors (e.g. due to - // network blip, or DNS resolution blip, etc) are usually transient. The - // client may choose to retry the request few times before giving up. - return nil, &retryableHTTPError{err} - } - - switch resp.StatusCode { - case 200, 201, 204, 206: - // Pass. - default: - body, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - return nil, errors.Errorf("error response from server: %s %q", resp.Status, body) - } - return resp, nil -} - -type s3Storage struct { - bucket *string - conf *roachpb.ExternalStorage_S3 - prefix string - s3 *s3.S3 - settings *cluster.Settings -} - // delayedRetry runs fn and re-runs it a limited number of times if it // fails. It knows about specific kinds of errors that need longer retry // delays than normal. @@ -792,612 +321,3 @@ func delayedRetry(ctx context.Context, fn func() error) error { return err }) } - -var _ ExternalStorage = &s3Storage{} - -func makeS3Storage( - ctx context.Context, conf *roachpb.ExternalStorage_S3, settings *cluster.Settings, -) (ExternalStorage, error) { - if conf == nil { - return nil, errors.Errorf("s3 upload requested but info missing") - } - region := conf.Region - config := conf.Keys() - if conf.Endpoint != "" { - config.Endpoint = &conf.Endpoint - if conf.Region == "" { - region = "default-region" - } - client, err := makeHTTPClient(settings) - if err != nil { - return nil, err - } - config.HTTPClient = client - } - - // "specified": use credentials provided in URI params; error if not present. - // "implicit": enable SharedConfig, which loads in credentials from environment. - // Detailed in https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ - // "": default to `specified`. - opts := session.Options{} - switch conf.Auth { - case "", authParamSpecified: - if conf.AccessKey == "" { - return nil, errors.Errorf( - "%s is set to '%s', but %s is not set", - AuthParam, - authParamSpecified, - S3AccessKeyParam, - ) - } - if conf.Secret == "" { - return nil, errors.Errorf( - "%s is set to '%s', but %s is not set", - AuthParam, - authParamSpecified, - S3SecretParam, - ) - } - opts.Config.MergeIn(config) - case authParamImplicit: - opts.SharedConfigState = session.SharedConfigEnable - default: - return nil, errors.Errorf("unsupported value %s for %s", conf.Auth, AuthParam) - } - - sess, err := session.NewSessionWithOptions(opts) - if err != nil { - return nil, errors.Wrap(err, "new aws session") - } - if region == "" { - err = delayedRetry(ctx, func() error { - var err error - region, err = s3manager.GetBucketRegion(ctx, sess, conf.Bucket, "us-east-1") - return err - }) - if err != nil { - return nil, errors.Wrap(err, "could not find s3 bucket's region") - } - } - sess.Config.Region = aws.String(region) - if conf.Endpoint != "" { - sess.Config.S3ForcePathStyle = aws.Bool(true) - } - return &s3Storage{ - bucket: aws.String(conf.Bucket), - conf: conf, - prefix: conf.Prefix, - s3: s3.New(sess), - settings: settings, - }, nil -} - -func (s *s3Storage) Conf() roachpb.ExternalStorage { - return roachpb.ExternalStorage{ - Provider: roachpb.ExternalStorageProvider_S3, - S3Config: s.conf, - } -} - -func (s *s3Storage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - err := contextutil.RunWithTimeout(ctx, "put s3 object", - timeoutSetting.Get(&s.settings.SV), - func(ctx context.Context) error { - _, err := s.s3.PutObjectWithContext(ctx, &s3.PutObjectInput{ - Bucket: s.bucket, - Key: aws.String(filepath.Join(s.prefix, basename)), - Body: content, - }) - return err - }) - return errors.Wrap(err, "failed to put s3 object") -} - -func (s *s3Storage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { - // https://github.com/cockroachdb/cockroach/issues/23859 - out, err := s.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ - Bucket: s.bucket, - Key: aws.String(filepath.Join(s.prefix, basename)), - }) - if err != nil { - return nil, errors.Wrap(err, "failed to get s3 object") - } - return out.Body, nil -} - -func getBucketBeforeWildcard(path string) string { - globIndex := strings.IndexAny(path, "*?[") - if globIndex < 0 { - return path - } - return filepath.Dir(path[:globIndex]) -} - -func (s *s3Storage) ListFiles(ctx context.Context) ([]string, error) { - var fileList []string - baseBucket := getBucketBeforeWildcard(*s.bucket) - - err := s.s3.ListObjectsPagesWithContext( - ctx, - &s3.ListObjectsInput{ - Bucket: &baseBucket, - }, - func(page *s3.ListObjectsOutput, lastPage bool) bool { - for _, fileObject := range page.Contents { - matches, err := filepath.Match(s.prefix, *fileObject.Key) - if err != nil { - continue - } - if matches { - s3URL := url.URL{ - Scheme: "s3", - Host: *s.bucket, - Path: *fileObject.Key, - } - fileList = append(fileList, s3URL.String()) - } - } - return !lastPage - }, - ) - if err != nil { - return nil, errors.Wrap(err, `failed to list s3 bucket`) - } - - return fileList, nil -} - -func (s *s3Storage) Delete(ctx context.Context, basename string) error { - return contextutil.RunWithTimeout(ctx, "delete s3 object", - timeoutSetting.Get(&s.settings.SV), - func(ctx context.Context) error { - _, err := s.s3.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ - Bucket: s.bucket, - Key: aws.String(filepath.Join(s.prefix, basename)), - }) - return err - }) -} - -func (s *s3Storage) Size(ctx context.Context, basename string) (int64, error) { - var out *s3.HeadObjectOutput - err := contextutil.RunWithTimeout(ctx, "get s3 object header", - timeoutSetting.Get(&s.settings.SV), - func(ctx context.Context) error { - var err error - out, err = s.s3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ - Bucket: s.bucket, - Key: aws.String(filepath.Join(s.prefix, basename)), - }) - return err - }) - if err != nil { - return 0, errors.Wrap(err, "failed to get s3 object headers") - } - return *out.ContentLength, nil -} - -func (s *s3Storage) Close() error { - return nil -} - -type gcsStorage struct { - bucket *gcs.BucketHandle - client *gcs.Client - conf *roachpb.ExternalStorage_GCS - prefix string - settings *cluster.Settings -} - -var _ ExternalStorage = &gcsStorage{} - -func (g *gcsStorage) Conf() roachpb.ExternalStorage { - return roachpb.ExternalStorage{ - Provider: roachpb.ExternalStorageProvider_GoogleCloud, - GoogleCloudConfig: g.conf, - } -} - -func makeGCSStorage( - ctx context.Context, conf *roachpb.ExternalStorage_GCS, settings *cluster.Settings, -) (ExternalStorage, error) { - if conf == nil { - return nil, errors.Errorf("google cloud storage upload requested but info missing") - } - const scope = gcs.ScopeReadWrite - opts := []option.ClientOption{option.WithScopes(scope)} - - // "default": only use the key in the settings; error if not present. - // "specified": the JSON object for authentication is given by the CREDENTIALS param. - // "implicit": only use the environment data. - // "": if default key is in the settings use it; otherwise use environment data. - switch conf.Auth { - case "", authParamDefault: - var key string - if settings != nil { - key = gcsDefault.Get(&settings.SV) - } - // We expect a key to be present if default is specified. - if conf.Auth == authParamDefault && key == "" { - return nil, errors.Errorf("expected settings value for %s", cloudstorageGSDefaultKey) - } - if key != "" { - source, err := google.JWTConfigFromJSON([]byte(key), scope) - if err != nil { - return nil, errors.Wrap(err, "creating GCS oauth token source") - } - opts = append(opts, option.WithTokenSource(source.TokenSource(ctx))) - } - case authParamSpecified: - if conf.Credentials == "" { - return nil, errors.Errorf( - "%s is set to '%s', but %s is not set", - AuthParam, - authParamSpecified, - CredentialsParam, - ) - } - decodedKey, err := base64.StdEncoding.DecodeString(conf.Credentials) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("decoding value of %s", CredentialsParam)) - } - source, err := google.JWTConfigFromJSON(decodedKey, scope) - if err != nil { - return nil, errors.Wrap(err, "creating GCS oauth token source from specified credentials") - } - opts = append(opts, option.WithTokenSource(source.TokenSource(ctx))) - case authParamImplicit: - // Do nothing; use implicit params: - // https://godoc.org/golang.org/x/oauth2/google#FindDefaultCredentials - default: - return nil, errors.Errorf("unsupported value %s for %s", conf.Auth, AuthParam) - } - g, err := gcs.NewClient(ctx, opts...) - if err != nil { - return nil, errors.Wrap(err, "failed to create google cloud client") - } - bucket := g.Bucket(conf.Bucket) - if conf.BillingProject != `` { - bucket = bucket.UserProject(conf.BillingProject) - } - return &gcsStorage{ - bucket: bucket, - client: g, - conf: conf, - prefix: conf.Prefix, - settings: settings, - }, nil -} - -func (g *gcsStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - const maxAttempts = 3 - err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { - if _, err := content.Seek(0, io.SeekStart); err != nil { - return err - } - // Set the timeout within the retry loop. - return contextutil.RunWithTimeout(ctx, "put gcs file", timeoutSetting.Get(&g.settings.SV), - func(ctx context.Context) error { - w := g.bucket.Object(filepath.Join(g.prefix, basename)).NewWriter(ctx) - if _, err := io.Copy(w, content); err != nil { - _ = w.Close() - return err - } - return w.Close() - }) - }) - return errors.Wrap(err, "write to google cloud") -} - -func (g *gcsStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { - // https://github.com/cockroachdb/cockroach/issues/23859 - - var rc io.ReadCloser - err := delayedRetry(ctx, func() error { - var readErr error - rc, readErr = g.bucket.Object(filepath.Join(g.prefix, basename)).NewReader(ctx) - return readErr - }) - return rc, err -} - -func (g *gcsStorage) ListFiles(ctx context.Context) ([]string, error) { - var fileList []string - it := g.bucket.Objects(ctx, &gcs.Query{ - Prefix: getBucketBeforeWildcard(g.prefix), - }) - for { - attrs, err := it.Next() - if err == iterator.Done { - break - } - if err != nil { - return nil, errors.Wrap(err, "unable to list files in gcs bucket") - } - - matches, errMatch := filepath.Match(g.prefix, attrs.Name) - if errMatch != nil { - continue - } - if matches { - gsURL := url.URL{ - Scheme: "gs", - Host: attrs.Bucket, - Path: attrs.Name, - } - fileList = append(fileList, gsURL.String()) - } - } - - return fileList, nil -} - -func (g *gcsStorage) Delete(ctx context.Context, basename string) error { - return contextutil.RunWithTimeout(ctx, "delete gcs file", - timeoutSetting.Get(&g.settings.SV), - func(ctx context.Context) error { - return g.bucket.Object(filepath.Join(g.prefix, basename)).Delete(ctx) - }) -} - -func (g *gcsStorage) Size(ctx context.Context, basename string) (int64, error) { - var r *gcs.Reader - if err := contextutil.RunWithTimeout(ctx, "size gcs file", - timeoutSetting.Get(&g.settings.SV), - func(ctx context.Context) error { - var err error - r, err = g.bucket.Object(filepath.Join(g.prefix, basename)).NewReader(ctx) - return err - }); err != nil { - return 0, err - } - sz := r.Attrs.Size - _ = r.Close() - return sz, nil -} - -func (g *gcsStorage) Close() error { - return g.client.Close() -} - -type azureStorage struct { - conf *roachpb.ExternalStorage_Azure - container azblob.ContainerURL - prefix string - settings *cluster.Settings -} - -var _ ExternalStorage = &azureStorage{} - -func makeAzureStorage( - conf *roachpb.ExternalStorage_Azure, settings *cluster.Settings, -) (ExternalStorage, error) { - if conf == nil { - return nil, errors.Errorf("azure upload requested but info missing") - } - credential, err := azblob.NewSharedKeyCredential(conf.AccountName, conf.AccountKey) - if err != nil { - return nil, errors.Wrap(err, "azure credential") - } - p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) - u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", conf.AccountName)) - if err != nil { - return nil, errors.Wrap(err, "azure: account name is not valid") - } - serviceURL := azblob.NewServiceURL(*u, p) - return &azureStorage{ - conf: conf, - container: serviceURL.NewContainerURL(conf.Container), - prefix: conf.Prefix, - settings: settings, - }, nil -} - -func (s *azureStorage) getBlob(basename string) azblob.BlockBlobURL { - name := filepath.Join(s.prefix, basename) - return s.container.NewBlockBlobURL(name) -} - -func (s *azureStorage) Conf() roachpb.ExternalStorage { - return roachpb.ExternalStorage{ - Provider: roachpb.ExternalStorageProvider_Azure, - AzureConfig: s.conf, - } -} - -func (s *azureStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - err := contextutil.RunWithTimeout(ctx, "write azure file", timeoutSetting.Get(&s.settings.SV), - func(ctx context.Context) error { - blob := s.getBlob(basename) - _, err := blob.Upload(ctx, content, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) - return err - }) - return errors.Wrapf(err, "write file: %s", basename) -} - -func (s *azureStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { - // https://github.com/cockroachdb/cockroach/issues/23859 - blob := s.getBlob(basename) - get, err := blob.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false) - if err != nil { - return nil, errors.Wrap(err, "failed to create azure reader") - } - reader := get.Body(azblob.RetryReaderOptions{MaxRetryRequests: 3}) - return reader, nil -} - -func (s *azureStorage) ListFiles(ctx context.Context) ([]string, error) { - var fileList []string - response, err := s.container.ListBlobsFlatSegment(ctx, - azblob.Marker{}, - azblob.ListBlobsSegmentOptions{Prefix: getBucketBeforeWildcard(s.prefix)}, - ) - if err != nil { - return nil, errors.Wrap(err, "unable to list files for specified blob") - } - - for _, blob := range response.Segment.BlobItems { - matches, err := filepath.Match(s.prefix, blob.Name) - if err != nil { - continue - } - if matches { - azureURL := url.URL{ - Scheme: "azure", - Host: strings.TrimPrefix(s.container.URL().Path, "/"), - Path: blob.Name, - } - fileList = append(fileList, azureURL.String()) - } - } - - return fileList, nil -} - -func (s *azureStorage) Delete(ctx context.Context, basename string) error { - err := contextutil.RunWithTimeout(ctx, "delete azure file", timeoutSetting.Get(&s.settings.SV), - func(ctx context.Context) error { - blob := s.getBlob(basename) - _, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) - return err - }) - return errors.Wrap(err, "delete file") -} - -func (s *azureStorage) Size(ctx context.Context, basename string) (int64, error) { - var props *azblob.BlobGetPropertiesResponse - err := contextutil.RunWithTimeout(ctx, "size azure file", timeoutSetting.Get(&s.settings.SV), - func(ctx context.Context) error { - blob := s.getBlob(basename) - var err error - props, err = blob.GetProperties(ctx, azblob.BlobAccessConditions{}) - return err - }) - if err != nil { - return 0, errors.Wrap(err, "get file properties") - } - return props.ContentLength(), nil -} - -func (s *azureStorage) Close() error { - return nil -} - -// ParseWorkloadConfig parses a workload config URI to a proto config. -func ParseWorkloadConfig(uri *url.URL) (*roachpb.ExternalStorage_Workload, error) { - c := &roachpb.ExternalStorage_Workload{} - pathParts := strings.Split(strings.Trim(uri.Path, `/`), `/`) - if len(pathParts) != 3 { - return nil, errors.Errorf( - `path must be of the form ///: %s`, uri.Path) - } - c.Format, c.Generator, c.Table = pathParts[0], pathParts[1], pathParts[2] - q := uri.Query() - if _, ok := q[`version`]; !ok { - return nil, errors.New(`parameter version is required`) - } - c.Version = q.Get(`version`) - q.Del(`version`) - if s := q.Get(`row-start`); len(s) > 0 { - q.Del(`row-start`) - var err error - if c.BatchBegin, err = strconv.ParseInt(s, 10, 64); err != nil { - return nil, err - } - } - if e := q.Get(`row-end`); len(e) > 0 { - q.Del(`row-end`) - var err error - if c.BatchEnd, err = strconv.ParseInt(e, 10, 64); err != nil { - return nil, err - } - } - for k, vs := range q { - for _, v := range vs { - c.Flags = append(c.Flags, `--`+k+`=`+v) - } - } - return c, nil -} - -type workloadStorage struct { - conf *roachpb.ExternalStorage_Workload - gen workload.Generator - table workload.Table -} - -var _ ExternalStorage = &workloadStorage{} - -func makeWorkloadStorage(conf *roachpb.ExternalStorage_Workload) (ExternalStorage, error) { - if conf == nil { - return nil, errors.Errorf("workload upload requested but info missing") - } - if strings.ToLower(conf.Format) != `csv` { - return nil, errors.Errorf(`unsupported format: %s`, conf.Format) - } - meta, err := workload.Get(conf.Generator) - if err != nil { - return nil, err - } - // Different versions of the workload could generate different data, so - // disallow this. - if meta.Version != conf.Version { - return nil, errors.Errorf( - `expected %s version "%s" but got "%s"`, meta.Name, conf.Version, meta.Version) - } - gen := meta.New() - if f, ok := gen.(workload.Flagser); ok { - if err := f.Flags().Parse(conf.Flags); err != nil { - return nil, errors.Wrapf(err, `parsing parameters %s`, strings.Join(conf.Flags, ` `)) - } - } - s := &workloadStorage{ - conf: conf, - gen: gen, - } - for _, t := range gen.Tables() { - if t.Name == conf.Table { - s.table = t - break - } - } - if s.table.Name == `` { - return nil, errors.Wrapf(err, `unknown table %s for generator %s`, conf.Table, meta.Name) - } - return s, nil -} - -func (s *workloadStorage) Conf() roachpb.ExternalStorage { - return roachpb.ExternalStorage{ - Provider: roachpb.ExternalStorageProvider_Workload, - WorkloadConfig: s.conf, - } -} - -func (s *workloadStorage) ReadFile(_ context.Context, basename string) (io.ReadCloser, error) { - if basename != `` { - return nil, errors.Errorf(`basenames are not supported by workload storage`) - } - r := workload.NewCSVRowsReader(s.table, int(s.conf.BatchBegin), int(s.conf.BatchEnd)) - return ioutil.NopCloser(r), nil -} - -func (s *workloadStorage) WriteFile(_ context.Context, _ string, _ io.ReadSeeker) error { - return errors.Errorf(`workload storage does not support writes`) -} - -func (s *workloadStorage) ListFiles(_ context.Context) ([]string, error) { - return nil, errors.Errorf(`workload storage does not support listing files`) -} - -func (s *workloadStorage) Delete(_ context.Context, _ string) error { - return errors.Errorf(`workload storage does not support deletes`) -} -func (s *workloadStorage) Size(_ context.Context, _ string) (int64, error) { - return 0, errors.Errorf(`workload storage does not support sizing`) -} -func (s *workloadStorage) Close() error { - return nil -} diff --git a/pkg/storage/cloud/external_storage_test.go b/pkg/storage/cloud/external_storage_test.go index 37feca75000a..971f044a0f45 100644 --- a/pkg/storage/cloud/external_storage_test.go +++ b/pkg/storage/cloud/external_storage_test.go @@ -15,28 +15,18 @@ import ( "context" "crypto/rand" "encoding/base64" - "encoding/pem" "fmt" - "io" "io/ioutil" - "net/http" - "net/http/httptest" "net/url" "os" "path/filepath" "sort" - "strconv" "strings" "testing" - "time" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/bank" "github.com/spf13/pflag" @@ -319,295 +309,6 @@ func testListFiles(t *testing.T, storeURI string) { } } -func TestPutLocal(t *testing.T) { - defer leaktest.AfterTest(t)() - - p, cleanupFn := testutils.TempDir(t) - defer cleanupFn() - - testSettings.ExternalIODir = p - dest := MakeLocalStorageURI(p) - - testExportStore(t, dest, false) - testListFiles(t, fmt.Sprintf("nodelocal:///%s", "listing-test")) -} - -func TestLocalIOLimits(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx := context.TODO() - const allowed = "/allowed" - testSettings.ExternalIODir = allowed - - clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) - for dest, expected := range map[string]string{allowed: "", "/../../blah": "not allowed"} { - u := fmt.Sprintf("nodelocal://%s", dest) - e, err := ExternalStorageFromURI(ctx, u, testSettings, clientFactory) - if err != nil { - t.Fatal(err) - } - _, err = e.ListFiles(ctx) - if !testutils.IsError(err, expected) { - t.Fatal(err) - } - } - - for host, expectErr := range map[string]bool{"": false, "1": false, "0": false, "blah": true} { - u := fmt.Sprintf("nodelocal://%s/path/to/file", host) - - var expected string - if expectErr { - expected = "host component of nodelocal URI must be a node ID" - } - if _, err := ExternalStorageConfFromURI(u); !testutils.IsError(err, expected) { - t.Fatalf("%q: expected error %q, got %v", u, expected, err) - } - } -} - -func TestPutHttp(t *testing.T) { - defer leaktest.AfterTest(t)() - - tmp, dirCleanup := testutils.TempDir(t) - defer dirCleanup() - - const badHeadResponse = "bad-head-response" - - makeServer := func() (*url.URL, func() int, func()) { - var files int - srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - localfile := filepath.Join(tmp, filepath.Base(r.URL.Path)) - switch r.Method { - case "PUT": - f, err := os.Create(localfile) - if err != nil { - http.Error(w, err.Error(), 500) - return - } - defer f.Close() - if _, err := io.Copy(f, r.Body); err != nil { - http.Error(w, err.Error(), 500) - return - } - files++ - w.WriteHeader(201) - case "GET", "HEAD": - if filepath.Base(localfile) == badHeadResponse { - http.Error(w, "HEAD not implemented", 500) - return - } - http.ServeFile(w, r, localfile) - case "DELETE": - if err := os.Remove(localfile); err != nil { - http.Error(w, err.Error(), 500) - return - } - w.WriteHeader(204) - default: - http.Error(w, "unsupported method "+r.Method, 400) - } - })) - - u := testSettings.MakeUpdater() - if err := u.Set( - cloudstorageHTTPCASetting, - string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw})), - "s", - ); err != nil { - t.Fatal(err) - } - - cleanup := func() { - srv.Close() - if err := u.Set(cloudstorageHTTPCASetting, "", "s"); err != nil { - t.Fatal(err) - } - } - - t.Logf("Mock HTTP Storage %q", srv.URL) - uri, err := url.Parse(srv.URL) - if err != nil { - srv.Close() - t.Fatal(err) - } - uri.Path = filepath.Join(uri.Path, "testing") - return uri, func() int { return files }, cleanup - } - - t.Run("singleHost", func(t *testing.T) { - srv, files, cleanup := makeServer() - defer cleanup() - testExportStore(t, srv.String(), false) - if expected, actual := 13, files(); expected != actual { - t.Fatalf("expected %d files to be written to single http store, got %d", expected, actual) - } - }) - - t.Run("multiHost", func(t *testing.T) { - srv1, files1, cleanup1 := makeServer() - defer cleanup1() - srv2, files2, cleanup2 := makeServer() - defer cleanup2() - srv3, files3, cleanup3 := makeServer() - defer cleanup3() - - combined := *srv1 - combined.Host = strings.Join([]string{srv1.Host, srv2.Host, srv3.Host}, ",") - - testExportStore(t, combined.String(), true) - if expected, actual := 3, files1(); expected != actual { - t.Fatalf("expected %d files written to http host 1, got %d", expected, actual) - } - if expected, actual := 4, files2(); expected != actual { - t.Fatalf("expected %d files written to http host 2, got %d", expected, actual) - } - if expected, actual := 4, files3(); expected != actual { - t.Fatalf("expected %d files written to http host 3, got %d", expected, actual) - } - }) - - // Ensure that servers that error on HEAD are handled gracefully. - t.Run("bad-head-response", func(t *testing.T) { - ctx := context.TODO() - - srv, _, cleanup := makeServer() - defer cleanup() - - conf, err := ExternalStorageConfFromURI(srv.String()) - if err != nil { - t.Fatal(err) - } - s, err := MakeExternalStorage(ctx, conf, testSettings, blobs.TestEmptyBlobClientFactory) - if err != nil { - t.Fatal(err) - } - defer s.Close() - - const file = "file" - var content = []byte("contents") - if err := s.WriteFile(ctx, file, bytes.NewReader(content)); err != nil { - t.Fatal(err) - } - if err := s.WriteFile(ctx, badHeadResponse, bytes.NewReader(content)); err != nil { - t.Fatal(err) - } - if sz, err := s.Size(ctx, file); err != nil { - t.Fatal(err) - } else if sz != int64(len(content)) { - t.Fatalf("expected %d, got %d", len(content), sz) - } - if sz, err := s.Size(ctx, badHeadResponse); !testutils.IsError(err, "500 Internal Server Error") { - t.Fatalf("unexpected error: %v", err) - } else if sz != 0 { - t.Fatalf("expected 0 size, got %d", sz) - } - }) -} - -func TestPutS3(t *testing.T) { - defer leaktest.AfterTest(t)() - - // If environment credentials are not present, we want to - // skip all S3 tests, including auth-implicit, even though - // it is not used in auth-implicit. - creds, err := credentials.NewEnvCredentials().Get() - if err != nil { - t.Skip("No AWS credentials") - } - bucket := os.Getenv("AWS_S3_BUCKET") - if bucket == "" { - t.Skip("AWS_S3_BUCKET env var must be set") - } - - ctx := context.TODO() - t.Run("auth-empty-no-cred", func(t *testing.T) { - _, err := ExternalStorageFromURI( - ctx, fmt.Sprintf("s3://%s/%s", bucket, "backup-test-default"), - testSettings, blobs.TestEmptyBlobClientFactory, - ) - require.EqualError(t, err, fmt.Sprintf( - `%s is set to '%s', but %s is not set`, - AuthParam, - authParamSpecified, - S3AccessKeyParam, - )) - }) - t.Run("auth-implicit", func(t *testing.T) { - // You can create an IAM that can access S3 - // in the AWS console, then set it up locally. - // https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-role.html - // We only run this test if default role exists. - credentialsProvider := credentials.SharedCredentialsProvider{} - _, err := credentialsProvider.Retrieve() - if err != nil { - t.Skip(err) - } - - testExportStore( - t, - fmt.Sprintf( - "s3://%s/%s?%s=%s", - bucket, "backup-test-default", - AuthParam, authParamImplicit, - ), - false, - ) - }) - - t.Run("auth-specified", func(t *testing.T) { - testExportStore(t, - fmt.Sprintf( - "s3://%s/%s?%s=%s&%s=%s", - bucket, "backup-test", - S3AccessKeyParam, url.QueryEscape(creds.AccessKeyID), - S3SecretParam, url.QueryEscape(creds.SecretAccessKey), - ), - false, - ) - testListFiles(t, - fmt.Sprintf( - "s3://%s/%s?%s=%s&%s=%s", - bucket, "listing-test", - S3AccessKeyParam, url.QueryEscape(creds.AccessKeyID), - S3SecretParam, url.QueryEscape(creds.SecretAccessKey), - ), - ) - }) -} - -func TestPutS3Endpoint(t *testing.T) { - defer leaktest.AfterTest(t)() - - q := make(url.Values) - expect := map[string]string{ - "AWS_S3_ENDPOINT": S3EndpointParam, - "AWS_S3_ENDPOINT_KEY": S3AccessKeyParam, - "AWS_S3_ENDPOINT_REGION": S3RegionParam, - "AWS_S3_ENDPOINT_SECRET": S3SecretParam, - } - for env, param := range expect { - v := os.Getenv(env) - if v == "" { - t.Skipf("%s env var must be set", env) - } - q.Add(param, v) - } - - bucket := os.Getenv("AWS_S3_ENDPOINT_BUCKET") - if bucket == "" { - t.Skip("AWS_S3_ENDPOINT_BUCKET env var must be set") - } - - u := url.URL{ - Scheme: "s3", - Host: bucket, - Path: "backup-test", - RawQuery: q.Encode(), - } - - testExportStore(t, u.String(), false) -} - func TestPutGoogleCloud(t *testing.T) { defer leaktest.AfterTest(t)() @@ -666,37 +367,6 @@ func TestPutGoogleCloud(t *testing.T) { }) } -func TestPutAzure(t *testing.T) { - defer leaktest.AfterTest(t)() - - accountName := os.Getenv("AZURE_ACCOUNT_NAME") - accountKey := os.Getenv("AZURE_ACCOUNT_KEY") - if accountName == "" || accountKey == "" { - t.Skip("AZURE_ACCOUNT_NAME and AZURE_ACCOUNT_KEY env vars must be set") - } - bucket := os.Getenv("AZURE_CONTAINER") - if bucket == "" { - t.Skip("AZURE_CONTAINER env var must be set") - } - - testExportStore(t, - fmt.Sprintf("azure://%s/%s?%s=%s&%s=%s", - bucket, "backup-test", - AzureAccountNameParam, url.QueryEscape(accountName), - AzureAccountKeyParam, url.QueryEscape(accountKey), - ), - false, - ) - testListFiles( - t, - fmt.Sprintf("azure://%s/%s?%s=%s&%s=%s", - bucket, "listing-test", - AzureAccountNameParam, url.QueryEscape(accountName), - AzureAccountKeyParam, url.QueryEscape(accountKey), - ), - ) -} - func TestWorkloadStorage(t *testing.T) { defer leaktest.AfterTest(t)() @@ -787,113 +457,3 @@ func TestWorkloadStorage(t *testing.T) { ) require.EqualError(t, err, `expected bank version "nope" but got "1.0.0"`) } - -func rangeStart(r string) (int, error) { - if len(r) == 0 { - return 0, nil - } - r = strings.TrimPrefix(r, "bytes=") - - return strconv.Atoi(r[:strings.IndexByte(r, '-')]) -} - -func TestHttpGet(t *testing.T) { - defer leaktest.AfterTest(t)() - data := []byte("to serve, or not to serve. c'est la question") - - httpRetryOptions.InitialBackoff = 1 * time.Microsecond - httpRetryOptions.MaxBackoff = 10 * time.Millisecond - httpRetryOptions.MaxRetries = 100 - - for _, tc := range []int{1, 2, 5, 16, 32, len(data) - 1, len(data)} { - t.Run(fmt.Sprintf("read-%d", tc), func(t *testing.T) { - limit := tc - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - start, err := rangeStart(r.Header.Get("Range")) - if start < 0 || start >= len(data) { - t.Errorf("invalid start offset %d in range header %s", - start, r.Header.Get("Range")) - } - end := start + limit - if end > len(data) { - end = len(data) - } - - w.Header().Add("Accept-Ranges", "bytes") - w.Header().Add("Content-Length", strconv.Itoa(len(data)-start)) - - if start > 0 { - w.Header().Add( - "Content-Range", - fmt.Sprintf("bytes %d-%d/%d", start, end, len(data))) - } - - if err == nil { - _, err = w.Write(data[start:end]) - } - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - } - })) - - // Start antagonist function that aggressively closes client connections. - ctx, cancelAntagonist := context.WithCancel(context.Background()) - g := ctxgroup.WithContext(ctx) - g.GoCtx(func(ctx context.Context) error { - opts := retry.Options{ - InitialBackoff: 500 * time.Microsecond, - MaxBackoff: 1 * time.Millisecond, - } - for attempt := retry.StartWithCtx(ctx, opts); attempt.Next(); { - s.CloseClientConnections() - } - return nil - }) - - store, err := makeHTTPStorage(s.URL, testSettings) - require.NoError(t, err) - - var file io.ReadCloser - - // Cleanup. - defer func() { - s.Close() - if store != nil { - require.NoError(t, store.Close()) - } - if file != nil { - require.NoError(t, file.Close()) - } - cancelAntagonist() - _ = g.Wait() - }() - - // Read the file and verify results. - file, err = store.ReadFile(ctx, "/something") - require.NoError(t, err) - - b, err := ioutil.ReadAll(file) - require.NoError(t, err) - require.EqualValues(t, data, b) - }) - } -} - -func TestHttpGetWithCancelledContext(t *testing.T) { - defer leaktest.AfterTest(t)() - - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) - defer s.Close() - - store, err := makeHTTPStorage(s.URL, testSettings) - require.NoError(t, err) - defer func() { - require.NoError(t, store.Close()) - }() - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - _, err = store.ReadFile(ctx, "/something") - require.Error(t, context.Canceled, err) -} diff --git a/pkg/storage/cloud/gcs_storage.go b/pkg/storage/cloud/gcs_storage.go new file mode 100644 index 000000000000..f73a7a79bd6e --- /dev/null +++ b/pkg/storage/cloud/gcs_storage.go @@ -0,0 +1,210 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "encoding/base64" + "fmt" + "io" + "net/url" + "path/filepath" + + gcs "cloud.google.com/go/storage" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "golang.org/x/oauth2/google" + "google.golang.org/api/iterator" + "google.golang.org/api/option" +) + +type gcsStorage struct { + bucket *gcs.BucketHandle + client *gcs.Client + conf *roachpb.ExternalStorage_GCS + prefix string + settings *cluster.Settings +} + +var _ ExternalStorage = &gcsStorage{} + +func (g *gcsStorage) Conf() roachpb.ExternalStorage { + return roachpb.ExternalStorage{ + Provider: roachpb.ExternalStorageProvider_GoogleCloud, + GoogleCloudConfig: g.conf, + } +} + +func makeGCSStorage( + ctx context.Context, conf *roachpb.ExternalStorage_GCS, settings *cluster.Settings, +) (ExternalStorage, error) { + if conf == nil { + return nil, errors.Errorf("google cloud storage upload requested but info missing") + } + const scope = gcs.ScopeReadWrite + opts := []option.ClientOption{option.WithScopes(scope)} + + // "default": only use the key in the settings; error if not present. + // "specified": the JSON object for authentication is given by the CREDENTIALS param. + // "implicit": only use the environment data. + // "": if default key is in the settings use it; otherwise use environment data. + switch conf.Auth { + case "", authParamDefault: + var key string + if settings != nil { + key = gcsDefault.Get(&settings.SV) + } + // We expect a key to be present if default is specified. + if conf.Auth == authParamDefault && key == "" { + return nil, errors.Errorf("expected settings value for %s", cloudstorageGSDefaultKey) + } + if key != "" { + source, err := google.JWTConfigFromJSON([]byte(key), scope) + if err != nil { + return nil, errors.Wrap(err, "creating GCS oauth token source") + } + opts = append(opts, option.WithTokenSource(source.TokenSource(ctx))) + } + case authParamSpecified: + if conf.Credentials == "" { + return nil, errors.Errorf( + "%s is set to '%s', but %s is not set", + AuthParam, + authParamSpecified, + CredentialsParam, + ) + } + decodedKey, err := base64.StdEncoding.DecodeString(conf.Credentials) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("decoding value of %s", CredentialsParam)) + } + source, err := google.JWTConfigFromJSON(decodedKey, scope) + if err != nil { + return nil, errors.Wrap(err, "creating GCS oauth token source from specified credentials") + } + opts = append(opts, option.WithTokenSource(source.TokenSource(ctx))) + case authParamImplicit: + // Do nothing; use implicit params: + // https://godoc.org/golang.org/x/oauth2/google#FindDefaultCredentials + default: + return nil, errors.Errorf("unsupported value %s for %s", conf.Auth, AuthParam) + } + g, err := gcs.NewClient(ctx, opts...) + if err != nil { + return nil, errors.Wrap(err, "failed to create google cloud client") + } + bucket := g.Bucket(conf.Bucket) + if conf.BillingProject != `` { + bucket = bucket.UserProject(conf.BillingProject) + } + return &gcsStorage{ + bucket: bucket, + client: g, + conf: conf, + prefix: conf.Prefix, + settings: settings, + }, nil +} + +func (g *gcsStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { + const maxAttempts = 3 + err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { + if _, err := content.Seek(0, io.SeekStart); err != nil { + return err + } + // Set the timeout within the retry loop. + return contextutil.RunWithTimeout(ctx, "put gcs file", timeoutSetting.Get(&g.settings.SV), + func(ctx context.Context) error { + w := g.bucket.Object(filepath.Join(g.prefix, basename)).NewWriter(ctx) + if _, err := io.Copy(w, content); err != nil { + _ = w.Close() + return err + } + return w.Close() + }) + }) + return errors.Wrap(err, "write to google cloud") +} + +func (g *gcsStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { + // https://github.com/cockroachdb/cockroach/issues/23859 + + var rc io.ReadCloser + err := delayedRetry(ctx, func() error { + var readErr error + rc, readErr = g.bucket.Object(filepath.Join(g.prefix, basename)).NewReader(ctx) + return readErr + }) + return rc, err +} + +func (g *gcsStorage) ListFiles(ctx context.Context) ([]string, error) { + var fileList []string + it := g.bucket.Objects(ctx, &gcs.Query{ + Prefix: getBucketBeforeWildcard(g.prefix), + }) + for { + attrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, errors.Wrap(err, "unable to list files in gcs bucket") + } + + matches, errMatch := filepath.Match(g.prefix, attrs.Name) + if errMatch != nil { + continue + } + if matches { + gsURL := url.URL{ + Scheme: "gs", + Host: attrs.Bucket, + Path: attrs.Name, + } + fileList = append(fileList, gsURL.String()) + } + } + + return fileList, nil +} + +func (g *gcsStorage) Delete(ctx context.Context, basename string) error { + return contextutil.RunWithTimeout(ctx, "delete gcs file", + timeoutSetting.Get(&g.settings.SV), + func(ctx context.Context) error { + return g.bucket.Object(filepath.Join(g.prefix, basename)).Delete(ctx) + }) +} + +func (g *gcsStorage) Size(ctx context.Context, basename string) (int64, error) { + var r *gcs.Reader + if err := contextutil.RunWithTimeout(ctx, "size gcs file", + timeoutSetting.Get(&g.settings.SV), + func(ctx context.Context) error { + var err error + r, err = g.bucket.Object(filepath.Join(g.prefix, basename)).NewReader(ctx) + return err + }); err != nil { + return 0, err + } + sz := r.Attrs.Size + _ = r.Close() + return sz, nil +} + +func (g *gcsStorage) Close() error { + return g.client.Close() +} diff --git a/pkg/storage/cloud/http_storage.go b/pkg/storage/cloud/http_storage.go new file mode 100644 index 000000000000..a510382f7623 --- /dev/null +++ b/pkg/storage/cloud/http_storage.go @@ -0,0 +1,378 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "hash/fnv" + "io" + "io/ioutil" + "net/http" + "net/url" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/sysutil" + "github.com/cockroachdb/errors" +) + +type httpStorage struct { + base *url.URL + client *http.Client + hosts []string + settings *cluster.Settings +} + +var _ ExternalStorage = &httpStorage{} + +type retryableHTTPError struct { + cause error +} + +func (e *retryableHTTPError) Error() string { + return fmt.Sprintf("retryable http error: %s", e.cause) +} + +// package visible for test. +var httpRetryOptions = retry.Options{ + InitialBackoff: 100 * time.Millisecond, + MaxBackoff: 2 * time.Second, + MaxRetries: 32, + Multiplier: 4, +} + +func makeHTTPClient(settings *cluster.Settings) (*http.Client, error) { + var tlsConf *tls.Config + if pem := httpCustomCA.Get(&settings.SV); pem != "" { + roots, err := x509.SystemCertPool() + if err != nil { + return nil, errors.Wrap(err, "could not load system root CA pool") + } + if !roots.AppendCertsFromPEM([]byte(pem)) { + return nil, errors.Errorf("failed to parse root CA certificate from %q", pem) + } + tlsConf = &tls.Config{RootCAs: roots} + } + // Copy the defaults from http.DefaultTransport. We cannot just copy the + // entire struct because it has a sync Mutex. This has the unfortunate problem + // that if Go adds fields to DefaultTransport they won't be copied here, + // but this is ok for now. + t := http.DefaultTransport.(*http.Transport) + return &http.Client{Transport: &http.Transport{ + Proxy: t.Proxy, + DialContext: t.DialContext, + MaxIdleConns: t.MaxIdleConns, + IdleConnTimeout: t.IdleConnTimeout, + TLSHandshakeTimeout: t.TLSHandshakeTimeout, + ExpectContinueTimeout: t.ExpectContinueTimeout, + + // Add our custom CA. + TLSClientConfig: tlsConf, + }}, nil +} + +func makeHTTPStorage(base string, settings *cluster.Settings) (ExternalStorage, error) { + if base == "" { + return nil, errors.Errorf("HTTP storage requested but base path not provided") + } + + client, err := makeHTTPClient(settings) + if err != nil { + return nil, err + } + uri, err := url.Parse(base) + if err != nil { + return nil, err + } + return &httpStorage{ + base: uri, + client: client, + hosts: strings.Split(uri.Host, ","), + settings: settings, + }, nil +} + +func (h *httpStorage) Conf() roachpb.ExternalStorage { + return roachpb.ExternalStorage{ + Provider: roachpb.ExternalStorageProvider_Http, + HttpPath: roachpb.ExternalStorage_Http{ + BaseUri: h.base.String(), + }, + } +} + +type resumingHTTPReader struct { + body io.ReadCloser + canResume bool // Can we resume if download aborts prematurely? + pos int64 // How much data was received so far. + ctx context.Context + url string + client *httpStorage +} + +var _ io.ReadCloser = &resumingHTTPReader{} + +func newResumingHTTPReader( + ctx context.Context, client *httpStorage, url string, +) (*resumingHTTPReader, error) { + r := &resumingHTTPReader{ + ctx: ctx, + client: client, + url: url, + } + + resp, err := r.sendRequest(nil) + if err != nil { + return nil, err + } + + r.canResume = resp.Header.Get("Accept-Ranges") == "bytes" + r.body = resp.Body + return r, nil +} + +func (r *resumingHTTPReader) Close() error { + if r.body != nil { + return r.body.Close() + } + return nil +} + +// checkHTTPContentRangeHeader parses Content-Range header and +// ensures that range start offset is the same as 'pos' +// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range +func checkHTTPContentRangeHeader(h string, pos int64) error { + if len(h) == 0 { + return errors.New("http server does not honor download resume") + } + + h = strings.TrimPrefix(h, "bytes ") + dash := strings.IndexByte(h, '-') + if dash <= 0 { + return errors.Errorf("malformed Content-Range header: %s", h) + } + + resume, err := strconv.ParseInt(h[:dash], 10, 64) + if err != nil { + return errors.Errorf("malformed start offset in Content-Range header: %s", h) + } + + if resume != pos { + return errors.Errorf( + "expected resume position %d, found %d instead in Content-Range header: %s", + pos, resume, h) + } + return nil +} + +func (r *resumingHTTPReader) sendRequest( + reqHeaders map[string]string, +) (resp *http.Response, err error) { + // Initialize err to the context.Canceled: if our context is canceled, we will + // never enter the loop below; in this case we want to return "nil, canceled" + err = context.Canceled + for attempt, retries := 0, + retry.StartWithCtx(r.ctx, httpRetryOptions); retries.Next(); attempt++ { + resp, err = r.client.req(r.ctx, "GET", r.url, nil, reqHeaders) + + if err == nil { + return + } + + log.Errorf(r.ctx, "HTTP:Req error: err=%s (attempt %d)", err, attempt) + + if _, ok := err.(*retryableHTTPError); !ok { + return + } + } + + return +} + +// requestNextRanges issues additional http request +// to continue downloading next range of bytes. +func (r *resumingHTTPReader) requestNextRange() (err error) { + if err := r.body.Close(); err != nil { + return err + } + + r.body = nil + var resp *http.Response + resp, err = r.sendRequest(map[string]string{"Range": fmt.Sprintf("bytes=%d-", r.pos)}) + + if err == nil { + err = checkHTTPContentRangeHeader(resp.Header.Get("Content-Range"), r.pos) + } + + if err == nil { + r.body = resp.Body + } + return +} + +// isResumableHTTPError returns true if we can +// resume download after receiving an error 'err'. +// We can attempt to resume download if the error is ErrUnexpectedEOF. +// In particular, we should not worry about a case when error is io.EOF. +// The reason for this is two-fold: +// 1. The underlying http library converts io.EOF to io.ErrUnexpectedEOF +// if the number of bytes transferred is less than the number of +// bytes advertised in the Content-Length header. So if we see +// io.ErrUnexpectedEOF we can simply request the next range. +// 2. If the server did *not* advertise Content-Length, then +// there is really nothing we can do: http standard says that +// the stream ends when the server terminates connection. +// In addition, we treat connection reset by peer errors (which can +// happen if we didn't read from the connection too long due to e.g. load), +// the same as unexpected eof errors. +func isResumableHTTPError(err error) bool { + return errors.Is(err, io.ErrUnexpectedEOF) || sysutil.IsErrConnectionReset(err) +} + +const maxNoProgressReads = 3 + +// Read implements io.Reader interface to read the data from the underlying +// http stream, issuing additional requests in case download is interrupted. +func (r *resumingHTTPReader) Read(p []byte) (n int, err error) { + for retries := 0; n == 0 && err == nil; retries++ { + n, err = r.body.Read(p) + r.pos += int64(n) + + if err != nil && !errors.IsAny(err, io.EOF, io.ErrUnexpectedEOF) { + log.Errorf(r.ctx, "HTTP:Read err: %s", err) + } + + // Resume download if the http server supports this. + if r.canResume && isResumableHTTPError(err) { + log.Errorf(r.ctx, "HTTP:Retry: error %s", err) + if retries > maxNoProgressReads { + err = errors.Wrap(err, "multiple Read calls return no data") + return + } + err = r.requestNextRange() + } + } + + return +} + +func (h *httpStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { + // https://github.com/cockroachdb/cockroach/issues/23859 + return newResumingHTTPReader(ctx, h, basename) +} + +func (h *httpStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { + return contextutil.RunWithTimeout(ctx, fmt.Sprintf("PUT %s", basename), + timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error { + _, err := h.reqNoBody(ctx, "PUT", basename, content) + return err + }) +} + +func (h *httpStorage) ListFiles(_ context.Context) ([]string, error) { + return nil, errors.New(`http storage does not support listing files`) +} + +func (h *httpStorage) Delete(ctx context.Context, basename string) error { + return contextutil.RunWithTimeout(ctx, fmt.Sprintf("DELETE %s", basename), + timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error { + _, err := h.reqNoBody(ctx, "DELETE", basename, nil) + return err + }) +} + +func (h *httpStorage) Size(ctx context.Context, basename string) (int64, error) { + var resp *http.Response + if err := contextutil.RunWithTimeout(ctx, fmt.Sprintf("HEAD %s", basename), + timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error { + var err error + resp, err = h.reqNoBody(ctx, "HEAD", basename, nil) + return err + }); err != nil { + return 0, err + } + if resp.ContentLength < 0 { + return 0, errors.Errorf("bad ContentLength: %d", resp.ContentLength) + } + return resp.ContentLength, nil +} + +func (h *httpStorage) Close() error { + return nil +} + +// reqNoBody is like req but it closes the response body. +func (h *httpStorage) reqNoBody( + ctx context.Context, method, file string, body io.Reader, +) (*http.Response, error) { + resp, err := h.req(ctx, method, file, body, nil) + if resp != nil { + resp.Body.Close() + } + return resp, err +} + +func (h *httpStorage) req( + ctx context.Context, method, file string, body io.Reader, headers map[string]string, +) (*http.Response, error) { + dest := *h.base + if hosts := len(h.hosts); hosts > 1 { + if file == "" { + return nil, errors.New("cannot use a multi-host HTTP basepath for single file") + } + hash := fnv.New32a() + if _, err := hash.Write([]byte(file)); err != nil { + panic(errors.Wrap(err, `"It never returns an error." -- https://golang.org/pkg/hash`)) + } + dest.Host = h.hosts[int(hash.Sum32())%hosts] + } + dest.Path = filepath.Join(dest.Path, file) + url := dest.String() + req, err := http.NewRequest(method, url, body) + + if err != nil { + return nil, errors.Wrapf(err, "error constructing request %s %q", method, url) + } + req = req.WithContext(ctx) + + for key, val := range headers { + req.Header.Add(key, val) + } + + resp, err := h.client.Do(req) + if err != nil { + // We failed to establish connection to the server (we don't even have + // a response object/server response code). Those errors (e.g. due to + // network blip, or DNS resolution blip, etc) are usually transient. The + // client may choose to retry the request few times before giving up. + return nil, &retryableHTTPError{err} + } + + switch resp.StatusCode { + case 200, 201, 204, 206: + // Pass. + default: + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + return nil, errors.Errorf("error response from server: %s %q", resp.Status, body) + } + return resp, nil +} diff --git a/pkg/storage/cloud/http_storage_test.go b/pkg/storage/cloud/http_storage_test.go new file mode 100644 index 000000000000..98ee787eba71 --- /dev/null +++ b/pkg/storage/cloud/http_storage_test.go @@ -0,0 +1,285 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "bytes" + "context" + "encoding/pem" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/stretchr/testify/require" +) + +func TestPutHttp(t *testing.T) { + defer leaktest.AfterTest(t)() + + tmp, dirCleanup := testutils.TempDir(t) + defer dirCleanup() + + const badHeadResponse = "bad-head-response" + + makeServer := func() (*url.URL, func() int, func()) { + var files int + srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + localfile := filepath.Join(tmp, filepath.Base(r.URL.Path)) + switch r.Method { + case "PUT": + f, err := os.Create(localfile) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + defer f.Close() + if _, err := io.Copy(f, r.Body); err != nil { + http.Error(w, err.Error(), 500) + return + } + files++ + w.WriteHeader(201) + case "GET", "HEAD": + if filepath.Base(localfile) == badHeadResponse { + http.Error(w, "HEAD not implemented", 500) + return + } + http.ServeFile(w, r, localfile) + case "DELETE": + if err := os.Remove(localfile); err != nil { + http.Error(w, err.Error(), 500) + return + } + w.WriteHeader(204) + default: + http.Error(w, "unsupported method "+r.Method, 400) + } + })) + + u := testSettings.MakeUpdater() + if err := u.Set( + cloudstorageHTTPCASetting, + string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw})), + "s", + ); err != nil { + t.Fatal(err) + } + + cleanup := func() { + srv.Close() + if err := u.Set(cloudstorageHTTPCASetting, "", "s"); err != nil { + t.Fatal(err) + } + } + + t.Logf("Mock HTTP Storage %q", srv.URL) + uri, err := url.Parse(srv.URL) + if err != nil { + srv.Close() + t.Fatal(err) + } + uri.Path = filepath.Join(uri.Path, "testing") + return uri, func() int { return files }, cleanup + } + + t.Run("singleHost", func(t *testing.T) { + srv, files, cleanup := makeServer() + defer cleanup() + testExportStore(t, srv.String(), false) + if expected, actual := 13, files(); expected != actual { + t.Fatalf("expected %d files to be written to single http store, got %d", expected, actual) + } + }) + + t.Run("multiHost", func(t *testing.T) { + srv1, files1, cleanup1 := makeServer() + defer cleanup1() + srv2, files2, cleanup2 := makeServer() + defer cleanup2() + srv3, files3, cleanup3 := makeServer() + defer cleanup3() + + combined := *srv1 + combined.Host = strings.Join([]string{srv1.Host, srv2.Host, srv3.Host}, ",") + + testExportStore(t, combined.String(), true) + if expected, actual := 3, files1(); expected != actual { + t.Fatalf("expected %d files written to http host 1, got %d", expected, actual) + } + if expected, actual := 4, files2(); expected != actual { + t.Fatalf("expected %d files written to http host 2, got %d", expected, actual) + } + if expected, actual := 4, files3(); expected != actual { + t.Fatalf("expected %d files written to http host 3, got %d", expected, actual) + } + }) + + // Ensure that servers that error on HEAD are handled gracefully. + t.Run("bad-head-response", func(t *testing.T) { + ctx := context.TODO() + + srv, _, cleanup := makeServer() + defer cleanup() + + conf, err := ExternalStorageConfFromURI(srv.String()) + if err != nil { + t.Fatal(err) + } + s, err := MakeExternalStorage(ctx, conf, testSettings, blobs.TestEmptyBlobClientFactory) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + const file = "file" + var content = []byte("contents") + if err := s.WriteFile(ctx, file, bytes.NewReader(content)); err != nil { + t.Fatal(err) + } + if err := s.WriteFile(ctx, badHeadResponse, bytes.NewReader(content)); err != nil { + t.Fatal(err) + } + if sz, err := s.Size(ctx, file); err != nil { + t.Fatal(err) + } else if sz != int64(len(content)) { + t.Fatalf("expected %d, got %d", len(content), sz) + } + if sz, err := s.Size(ctx, badHeadResponse); !testutils.IsError(err, "500 Internal Server Error") { + t.Fatalf("unexpected error: %v", err) + } else if sz != 0 { + t.Fatalf("expected 0 size, got %d", sz) + } + }) +} + +func rangeStart(r string) (int, error) { + if len(r) == 0 { + return 0, nil + } + r = strings.TrimPrefix(r, "bytes=") + + return strconv.Atoi(r[:strings.IndexByte(r, '-')]) +} + +func TestHttpGet(t *testing.T) { + defer leaktest.AfterTest(t)() + data := []byte("to serve, or not to serve. c'est la question") + + httpRetryOptions.InitialBackoff = 1 * time.Microsecond + httpRetryOptions.MaxBackoff = 10 * time.Millisecond + httpRetryOptions.MaxRetries = 100 + + for _, tc := range []int{1, 2, 5, 16, 32, len(data) - 1, len(data)} { + t.Run(fmt.Sprintf("read-%d", tc), func(t *testing.T) { + limit := tc + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start, err := rangeStart(r.Header.Get("Range")) + if start < 0 || start >= len(data) { + t.Errorf("invalid start offset %d in range header %s", + start, r.Header.Get("Range")) + } + end := start + limit + if end > len(data) { + end = len(data) + } + + w.Header().Add("Accept-Ranges", "bytes") + w.Header().Add("Content-Length", strconv.Itoa(len(data)-start)) + + if start > 0 { + w.Header().Add( + "Content-Range", + fmt.Sprintf("bytes %d-%d/%d", start, end, len(data))) + } + + if err == nil { + _, err = w.Write(data[start:end]) + } + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + })) + + // Start antagonist function that aggressively closes client connections. + ctx, cancelAntagonist := context.WithCancel(context.Background()) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + opts := retry.Options{ + InitialBackoff: 500 * time.Microsecond, + MaxBackoff: 1 * time.Millisecond, + } + for attempt := retry.StartWithCtx(ctx, opts); attempt.Next(); { + s.CloseClientConnections() + } + return nil + }) + + store, err := makeHTTPStorage(s.URL, testSettings) + require.NoError(t, err) + + var file io.ReadCloser + + // Cleanup. + defer func() { + s.Close() + if store != nil { + require.NoError(t, store.Close()) + } + if file != nil { + require.NoError(t, file.Close()) + } + cancelAntagonist() + _ = g.Wait() + }() + + // Read the file and verify results. + file, err = store.ReadFile(ctx, "/something") + require.NoError(t, err) + + b, err := ioutil.ReadAll(file) + require.NoError(t, err) + require.EqualValues(t, data, b) + }) + } +} + +func TestHttpGetWithCancelledContext(t *testing.T) { + defer leaktest.AfterTest(t)() + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer s.Close() + + store, err := makeHTTPStorage(s.URL, testSettings) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err = store.ReadFile(ctx, "/something") + require.Error(t, context.Canceled, err) +} diff --git a/pkg/storage/cloud/nodelocal_storage.go b/pkg/storage/cloud/nodelocal_storage.go new file mode 100644 index 000000000000..e597d7169b59 --- /dev/null +++ b/pkg/storage/cloud/nodelocal_storage.go @@ -0,0 +1,122 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "fmt" + "io" + "path" + "strings" + + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/errors" +) + +type localFileStorage struct { + cfg roachpb.ExternalStorage_LocalFilePath // contains un-prefixed filepath -- DO NOT use for I/O ops. + base string // relative filepath prefixed with externalIODir, for I/O ops on this node. + blobClient blobs.BlobClient // inter-node file sharing service +} + +var _ ExternalStorage = &localFileStorage{} + +// MakeLocalStorageURI converts a local path (should always be relative) to a +// valid nodelocal URI. +func MakeLocalStorageURI(path string) string { + return fmt.Sprintf("nodelocal:///%s", path) +} + +func makeNodeLocalURIWithNodeID(nodeID roachpb.NodeID, path string) string { + path = strings.TrimPrefix(path, "/") + if nodeID == 0 { + return fmt.Sprintf("nodelocal:///%s", path) + } + return fmt.Sprintf("nodelocal://%d/%s", nodeID, path) +} + +func makeLocalStorage( + ctx context.Context, + cfg roachpb.ExternalStorage_LocalFilePath, + settings *cluster.Settings, + blobClientFactory blobs.BlobClientFactory, +) (ExternalStorage, error) { + if cfg.Path == "" { + return nil, errors.Errorf("Local storage requested but path not provided") + } + client, err := blobClientFactory(ctx, cfg.NodeID) + if err != nil { + return nil, errors.Wrap(err, "failed to create blob client") + } + + // In non-server execution we have no settings and no restriction on local IO. + if settings != nil { + if settings.ExternalIODir == "" { + return nil, errors.Errorf("local file access is disabled") + } + } + return &localFileStorage{base: cfg.Path, cfg: cfg, blobClient: client}, nil +} + +func (l *localFileStorage) Conf() roachpb.ExternalStorage { + return roachpb.ExternalStorage{ + Provider: roachpb.ExternalStorageProvider_LocalFile, + LocalFile: l.cfg, + } +} + +func joinRelativePath(filePath string, file string) string { + // Joining "." to make this a relative path. + // This ensures path.Clean does not simplify in unexpected ways. + return path.Join(".", filePath, file) +} + +func (l *localFileStorage) WriteFile( + ctx context.Context, basename string, content io.ReadSeeker, +) error { + return l.blobClient.WriteFile(ctx, joinRelativePath(l.base, basename), content) +} + +func (l *localFileStorage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { + return l.blobClient.ReadFile(ctx, joinRelativePath(l.base, basename)) +} + +func (l *localFileStorage) ListFiles(ctx context.Context) ([]string, error) { + var fileList []string + matches, err := l.blobClient.List(ctx, l.base) + if err != nil { + return nil, errors.Wrap(err, "unable to match pattern provided") + } + + for _, fileName := range matches { + fileList = append(fileList, makeNodeLocalURIWithNodeID(l.cfg.NodeID, fileName)) + } + + return fileList, nil +} + +func (l *localFileStorage) Delete(ctx context.Context, basename string) error { + return l.blobClient.Delete(ctx, joinRelativePath(l.base, basename)) +} + +func (l *localFileStorage) Size(ctx context.Context, basename string) (int64, error) { + stat, err := l.blobClient.Stat(ctx, joinRelativePath(l.base, basename)) + if err != nil { + return 0, err + } + return stat.Filesize, nil +} + +func (*localFileStorage) Close() error { + return nil +} diff --git a/pkg/storage/cloud/nodelocal_storage_test.go b/pkg/storage/cloud/nodelocal_storage_test.go new file mode 100644 index 000000000000..e7aeaafea835 --- /dev/null +++ b/pkg/storage/cloud/nodelocal_storage_test.go @@ -0,0 +1,67 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestPutLocal(t *testing.T) { + defer leaktest.AfterTest(t)() + + p, cleanupFn := testutils.TempDir(t) + defer cleanupFn() + + testSettings.ExternalIODir = p + dest := MakeLocalStorageURI(p) + + testExportStore(t, dest, false) + testListFiles(t, fmt.Sprintf("nodelocal:///%s", "listing-test")) +} + +func TestLocalIOLimits(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.TODO() + const allowed = "/allowed" + testSettings.ExternalIODir = allowed + + clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) + for dest, expected := range map[string]string{allowed: "", "/../../blah": "not allowed"} { + u := fmt.Sprintf("nodelocal://%s", dest) + e, err := ExternalStorageFromURI(ctx, u, testSettings, clientFactory) + if err != nil { + t.Fatal(err) + } + _, err = e.ListFiles(ctx) + if !testutils.IsError(err, expected) { + t.Fatal(err) + } + } + + for host, expectErr := range map[string]bool{"": false, "1": false, "0": false, "blah": true} { + u := fmt.Sprintf("nodelocal://%s/path/to/file", host) + + var expected string + if expectErr { + expected = "host component of nodelocal URI must be a node ID" + } + if _, err := ExternalStorageConfFromURI(u); !testutils.IsError(err, expected) { + t.Fatalf("%q: expected error %q, got %v", u, expected, err) + } + } +} diff --git a/pkg/storage/cloud/s3_storage.go b/pkg/storage/cloud/s3_storage.go new file mode 100644 index 000000000000..84adc3c14666 --- /dev/null +++ b/pkg/storage/cloud/s3_storage.go @@ -0,0 +1,224 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "io" + "net/url" + "path/filepath" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/errors" +) + +type s3Storage struct { + bucket *string + conf *roachpb.ExternalStorage_S3 + prefix string + s3 *s3.S3 + settings *cluster.Settings +} + +var _ ExternalStorage = &s3Storage{} + +func makeS3Storage( + ctx context.Context, conf *roachpb.ExternalStorage_S3, settings *cluster.Settings, +) (ExternalStorage, error) { + if conf == nil { + return nil, errors.Errorf("s3 upload requested but info missing") + } + region := conf.Region + config := conf.Keys() + if conf.Endpoint != "" { + config.Endpoint = &conf.Endpoint + if conf.Region == "" { + region = "default-region" + } + client, err := makeHTTPClient(settings) + if err != nil { + return nil, err + } + config.HTTPClient = client + } + + // "specified": use credentials provided in URI params; error if not present. + // "implicit": enable SharedConfig, which loads in credentials from environment. + // Detailed in https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ + // "": default to `specified`. + opts := session.Options{} + switch conf.Auth { + case "", authParamSpecified: + if conf.AccessKey == "" { + return nil, errors.Errorf( + "%s is set to '%s', but %s is not set", + AuthParam, + authParamSpecified, + S3AccessKeyParam, + ) + } + if conf.Secret == "" { + return nil, errors.Errorf( + "%s is set to '%s', but %s is not set", + AuthParam, + authParamSpecified, + S3SecretParam, + ) + } + opts.Config.MergeIn(config) + case authParamImplicit: + opts.SharedConfigState = session.SharedConfigEnable + default: + return nil, errors.Errorf("unsupported value %s for %s", conf.Auth, AuthParam) + } + + sess, err := session.NewSessionWithOptions(opts) + if err != nil { + return nil, errors.Wrap(err, "new aws session") + } + if region == "" { + err = delayedRetry(ctx, func() error { + var err error + region, err = s3manager.GetBucketRegion(ctx, sess, conf.Bucket, "us-east-1") + return err + }) + if err != nil { + return nil, errors.Wrap(err, "could not find s3 bucket's region") + } + } + sess.Config.Region = aws.String(region) + if conf.Endpoint != "" { + sess.Config.S3ForcePathStyle = aws.Bool(true) + } + return &s3Storage{ + bucket: aws.String(conf.Bucket), + conf: conf, + prefix: conf.Prefix, + s3: s3.New(sess), + settings: settings, + }, nil +} + +func (s *s3Storage) Conf() roachpb.ExternalStorage { + return roachpb.ExternalStorage{ + Provider: roachpb.ExternalStorageProvider_S3, + S3Config: s.conf, + } +} + +func (s *s3Storage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { + err := contextutil.RunWithTimeout(ctx, "put s3 object", + timeoutSetting.Get(&s.settings.SV), + func(ctx context.Context) error { + _, err := s.s3.PutObjectWithContext(ctx, &s3.PutObjectInput{ + Bucket: s.bucket, + Key: aws.String(filepath.Join(s.prefix, basename)), + Body: content, + }) + return err + }) + return errors.Wrap(err, "failed to put s3 object") +} + +func (s *s3Storage) ReadFile(ctx context.Context, basename string) (io.ReadCloser, error) { + // https://github.com/cockroachdb/cockroach/issues/23859 + out, err := s.s3.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: s.bucket, + Key: aws.String(filepath.Join(s.prefix, basename)), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to get s3 object") + } + return out.Body, nil +} + +func getBucketBeforeWildcard(path string) string { + globIndex := strings.IndexAny(path, "*?[") + if globIndex < 0 { + return path + } + return filepath.Dir(path[:globIndex]) +} + +func (s *s3Storage) ListFiles(ctx context.Context) ([]string, error) { + var fileList []string + baseBucket := getBucketBeforeWildcard(*s.bucket) + + err := s.s3.ListObjectsPagesWithContext( + ctx, + &s3.ListObjectsInput{ + Bucket: &baseBucket, + }, + func(page *s3.ListObjectsOutput, lastPage bool) bool { + for _, fileObject := range page.Contents { + matches, err := filepath.Match(s.prefix, *fileObject.Key) + if err != nil { + continue + } + if matches { + s3URL := url.URL{ + Scheme: "s3", + Host: *s.bucket, + Path: *fileObject.Key, + } + fileList = append(fileList, s3URL.String()) + } + } + return !lastPage + }, + ) + if err != nil { + return nil, errors.Wrap(err, `failed to list s3 bucket`) + } + + return fileList, nil +} + +func (s *s3Storage) Delete(ctx context.Context, basename string) error { + return contextutil.RunWithTimeout(ctx, "delete s3 object", + timeoutSetting.Get(&s.settings.SV), + func(ctx context.Context) error { + _, err := s.s3.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ + Bucket: s.bucket, + Key: aws.String(filepath.Join(s.prefix, basename)), + }) + return err + }) +} + +func (s *s3Storage) Size(ctx context.Context, basename string) (int64, error) { + var out *s3.HeadObjectOutput + err := contextutil.RunWithTimeout(ctx, "get s3 object header", + timeoutSetting.Get(&s.settings.SV), + func(ctx context.Context) error { + var err error + out, err = s.s3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ + Bucket: s.bucket, + Key: aws.String(filepath.Join(s.prefix, basename)), + }) + return err + }) + if err != nil { + return 0, errors.Wrap(err, "failed to get s3 object headers") + } + return *out.ContentLength, nil +} + +func (s *s3Storage) Close() error { + return nil +} diff --git a/pkg/storage/cloud/s3_storage_test.go b/pkg/storage/cloud/s3_storage_test.go new file mode 100644 index 000000000000..4b9f93ab6e8c --- /dev/null +++ b/pkg/storage/cloud/s3_storage_test.go @@ -0,0 +1,128 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "fmt" + "net/url" + "os" + "testing" + + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +func TestPutS3(t *testing.T) { + defer leaktest.AfterTest(t)() + + // If environment credentials are not present, we want to + // skip all S3 tests, including auth-implicit, even though + // it is not used in auth-implicit. + creds, err := credentials.NewEnvCredentials().Get() + if err != nil { + t.Skip("No AWS credentials") + } + bucket := os.Getenv("AWS_S3_BUCKET") + if bucket == "" { + t.Skip("AWS_S3_BUCKET env var must be set") + } + + ctx := context.TODO() + t.Run("auth-empty-no-cred", func(t *testing.T) { + _, err := ExternalStorageFromURI( + ctx, fmt.Sprintf("s3://%s/%s", bucket, "backup-test-default"), + testSettings, blobs.TestEmptyBlobClientFactory, + ) + require.EqualError(t, err, fmt.Sprintf( + `%s is set to '%s', but %s is not set`, + AuthParam, + authParamSpecified, + S3AccessKeyParam, + )) + }) + t.Run("auth-implicit", func(t *testing.T) { + // You can create an IAM that can access S3 + // in the AWS console, then set it up locally. + // https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-role.html + // We only run this test if default role exists. + credentialsProvider := credentials.SharedCredentialsProvider{} + _, err := credentialsProvider.Retrieve() + if err != nil { + t.Skip(err) + } + + testExportStore( + t, + fmt.Sprintf( + "s3://%s/%s?%s=%s", + bucket, "backup-test-default", + AuthParam, authParamImplicit, + ), + false, + ) + }) + + t.Run("auth-specified", func(t *testing.T) { + testExportStore(t, + fmt.Sprintf( + "s3://%s/%s?%s=%s&%s=%s", + bucket, "backup-test", + S3AccessKeyParam, url.QueryEscape(creds.AccessKeyID), + S3SecretParam, url.QueryEscape(creds.SecretAccessKey), + ), + false, + ) + testListFiles(t, + fmt.Sprintf( + "s3://%s/%s?%s=%s&%s=%s", + bucket, "listing-test", + S3AccessKeyParam, url.QueryEscape(creds.AccessKeyID), + S3SecretParam, url.QueryEscape(creds.SecretAccessKey), + ), + ) + }) +} + +func TestPutS3Endpoint(t *testing.T) { + defer leaktest.AfterTest(t)() + + q := make(url.Values) + expect := map[string]string{ + "AWS_S3_ENDPOINT": S3EndpointParam, + "AWS_S3_ENDPOINT_KEY": S3AccessKeyParam, + "AWS_S3_ENDPOINT_REGION": S3RegionParam, + "AWS_S3_ENDPOINT_SECRET": S3SecretParam, + } + for env, param := range expect { + v := os.Getenv(env) + if v == "" { + t.Skipf("%s env var must be set", env) + } + q.Add(param, v) + } + + bucket := os.Getenv("AWS_S3_ENDPOINT_BUCKET") + if bucket == "" { + t.Skip("AWS_S3_ENDPOINT_BUCKET env var must be set") + } + + u := url.URL{ + Scheme: "s3", + Host: bucket, + Path: "backup-test", + RawQuery: q.Encode(), + } + + testExportStore(t, u.String(), false) +} diff --git a/pkg/storage/cloud/workload_storage.go b/pkg/storage/cloud/workload_storage.go new file mode 100644 index 000000000000..2c19f557388c --- /dev/null +++ b/pkg/storage/cloud/workload_storage.go @@ -0,0 +1,141 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cloud + +import ( + "context" + "io" + "io/ioutil" + "net/url" + "strconv" + "strings" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/workload" + "github.com/cockroachdb/errors" +) + +type workloadStorage struct { + conf *roachpb.ExternalStorage_Workload + gen workload.Generator + table workload.Table +} + +var _ ExternalStorage = &workloadStorage{} + +func makeWorkloadStorage(conf *roachpb.ExternalStorage_Workload) (ExternalStorage, error) { + if conf == nil { + return nil, errors.Errorf("workload upload requested but info missing") + } + if strings.ToLower(conf.Format) != `csv` { + return nil, errors.Errorf(`unsupported format: %s`, conf.Format) + } + meta, err := workload.Get(conf.Generator) + if err != nil { + return nil, err + } + // Different versions of the workload could generate different data, so + // disallow this. + if meta.Version != conf.Version { + return nil, errors.Errorf( + `expected %s version "%s" but got "%s"`, meta.Name, conf.Version, meta.Version) + } + gen := meta.New() + if f, ok := gen.(workload.Flagser); ok { + if err := f.Flags().Parse(conf.Flags); err != nil { + return nil, errors.Wrapf(err, `parsing parameters %s`, strings.Join(conf.Flags, ` `)) + } + } + s := &workloadStorage{ + conf: conf, + gen: gen, + } + for _, t := range gen.Tables() { + if t.Name == conf.Table { + s.table = t + break + } + } + if s.table.Name == `` { + return nil, errors.Wrapf(err, `unknown table %s for generator %s`, conf.Table, meta.Name) + } + return s, nil +} + +func (s *workloadStorage) Conf() roachpb.ExternalStorage { + return roachpb.ExternalStorage{ + Provider: roachpb.ExternalStorageProvider_Workload, + WorkloadConfig: s.conf, + } +} + +func (s *workloadStorage) ReadFile(_ context.Context, basename string) (io.ReadCloser, error) { + if basename != `` { + return nil, errors.Errorf(`basenames are not supported by workload storage`) + } + r := workload.NewCSVRowsReader(s.table, int(s.conf.BatchBegin), int(s.conf.BatchEnd)) + return ioutil.NopCloser(r), nil +} + +func (s *workloadStorage) WriteFile(_ context.Context, _ string, _ io.ReadSeeker) error { + return errors.Errorf(`workload storage does not support writes`) +} + +func (s *workloadStorage) ListFiles(_ context.Context) ([]string, error) { + return nil, errors.Errorf(`workload storage does not support listing files`) +} + +func (s *workloadStorage) Delete(_ context.Context, _ string) error { + return errors.Errorf(`workload storage does not support deletes`) +} +func (s *workloadStorage) Size(_ context.Context, _ string) (int64, error) { + return 0, errors.Errorf(`workload storage does not support sizing`) +} +func (s *workloadStorage) Close() error { + return nil +} + +// ParseWorkloadConfig parses a workload config URI to a proto config. +func ParseWorkloadConfig(uri *url.URL) (*roachpb.ExternalStorage_Workload, error) { + c := &roachpb.ExternalStorage_Workload{} + pathParts := strings.Split(strings.Trim(uri.Path, `/`), `/`) + if len(pathParts) != 3 { + return nil, errors.Errorf( + `path must be of the form ///
: %s`, uri.Path) + } + c.Format, c.Generator, c.Table = pathParts[0], pathParts[1], pathParts[2] + q := uri.Query() + if _, ok := q[`version`]; !ok { + return nil, errors.New(`parameter version is required`) + } + c.Version = q.Get(`version`) + q.Del(`version`) + if s := q.Get(`row-start`); len(s) > 0 { + q.Del(`row-start`) + var err error + if c.BatchBegin, err = strconv.ParseInt(s, 10, 64); err != nil { + return nil, err + } + } + if e := q.Get(`row-end`); len(e) > 0 { + q.Del(`row-end`) + var err error + if c.BatchEnd, err = strconv.ParseInt(e, 10, 64); err != nil { + return nil, err + } + } + for k, vs := range q { + for _, v := range vs { + c.Flags = append(c.Flags, `--`+k+`=`+v) + } + } + return c, nil +} diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 70109e2ba736..de6fb156f7b0 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -384,7 +384,7 @@ func TestLint(t *testing.T) { ":!acceptance", ":!ccl/acceptanceccl/backup_test.go", ":!ccl/backupccl/backup_cloud_test.go", - ":!storage/cloud/external_storage_test.go", + ":!storage/cloud", ":!ccl/workloadccl/fixture_test.go", ":!cmd", ":!nightly",