From 026cd419f1778a7da0d92c08deaf0126e41b889e Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 20 Aug 2020 14:56:24 +0530 Subject: [PATCH] compactor for compacting boltdb files uploaded by shipper --- pkg/loki/loki.go | 6 + pkg/loki/modules.go | 13 + .../stores/shipper/compactor/compactor.go | 91 +++++ pkg/storage/stores/shipper/compactor/table.go | 334 ++++++++++++++++++ .../stores/shipper/compactor/table_test.go | 181 ++++++++++ pkg/storage/stores/shipper/downloads/table.go | 42 +-- .../stores/shipper/shipper_index_client.go | 4 +- pkg/storage/stores/shipper/table_client.go | 2 +- .../stores/shipper/table_client_test.go | 2 +- .../stores/shipper/testutil/testutil.go | 21 ++ .../stores/shipper/uploads/table_test.go | 4 +- pkg/storage/stores/shipper/util/util.go | 118 +++++++ pkg/storage/stores/shipper/util/util_test.go | 77 ++++ 13 files changed, 850 insertions(+), 45 deletions(-) create mode 100644 pkg/storage/stores/shipper/compactor/compactor.go create mode 100644 pkg/storage/stores/shipper/compactor/table.go create mode 100644 pkg/storage/stores/shipper/compactor/table_test.go create mode 100644 pkg/storage/stores/shipper/util/util.go create mode 100644 pkg/storage/stores/shipper/util/util_test.go diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 4d22d8b15d66..591a9ee0495a 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -7,6 +7,8 @@ import ( "fmt" "net/http" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/modules" "github.com/prometheus/client_golang/prometheus" @@ -61,6 +63,7 @@ type Config struct { RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"` MemberlistKV memberlist.KVConfig `yaml:"memberlist"` Tracing tracing.Config `yaml:"tracing"` + CompactorConfig compactor.Config `yaml:"compactor,omitempty"` } // RegisterFlags registers flag. @@ -135,6 +138,7 @@ type Loki struct { stopper queryrange.Stopper runtimeConfig *runtimeconfig.Manager memberlistKV *memberlist.KVInitService + compactor *compactor.Compactor httpAuthMiddleware middleware.Interface } @@ -305,6 +309,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(Querier, t.initQuerier) mm.RegisterModule(QueryFrontend, t.initQueryFrontend) mm.RegisterModule(TableManager, t.initTableManager) + mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(All, nil) // Add dependencies @@ -317,6 +322,7 @@ func (t *Loki) setupModuleManager() error { Querier: {Store, Ring, Server}, QueryFrontend: {Server, Overrides}, TableManager: {Server}, + Compactor: {Server}, All: {Querier, Ingester, Distributor, TableManager}, } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 7efb5047fdb6..9dd17f727d95 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -9,6 +9,8 @@ import ( "os" "time" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/chunk/storage" @@ -55,6 +57,7 @@ const ( Store string = "store" TableManager string = "table-manager" MemberlistKV string = "memberlist-kv" + Compactor string = "compactor" All string = "all" ) @@ -360,6 +363,16 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { return t.memberlistKV, nil } +func (t *Loki) initCompactor() (services.Service, error) { + var err error + t.compactor, err = compactor.NewCompactor(t.cfg.CompactorConfig, t.cfg.StorageConfig.Config) + if err != nil { + return nil, err + } + + return t.compactor, nil +} + func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) { if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`") diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go new file mode 100644 index 000000000000..cc2783750fa7 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -0,0 +1,91 @@ +package compactor + +import ( + "context" + "flag" + "path/filepath" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/storage" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + pkg_util "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/go-kit/kit/log/level" + + "github.com/grafana/loki/pkg/storage/stores/shipper" + "github.com/grafana/loki/pkg/storage/stores/util" +) + +type Config struct { + WorkingDirectory string `yaml:"working_directory"` + SharedStoreType string `yaml:"shared_store"` +} + +// RegisterFlags registers flags. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.WorkingDirectory, "working-directory", "", "Directory where files can be downloaded for compaction.") + f.StringVar(&cfg.SharedStoreType, "shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem") +} + +type Compactor struct { + services.Service + + cfg Config + objectClient chunk.ObjectClient +} + +func NewCompactor(cfg Config, storageConfig storage.Config) (*Compactor, error) { + objectClient, err := storage.NewObjectClient(cfg.SharedStoreType, storageConfig) + if err != nil { + return nil, err + } + + err = chunk_util.EnsureDirectory(cfg.WorkingDirectory) + if err != nil { + return nil, err + } + + compactor := Compactor{ + cfg: cfg, + objectClient: util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix), + } + + compactor.Service = services.NewTimerService(4*time.Hour, nil, compactor.Run, nil) + return &compactor, nil +} + +func (c *Compactor) Run(ctx context.Context) error { + _, dirs, err := c.objectClient.List(ctx, "") + if err != nil { + return err + } + + tables := make([]string, len(dirs)) + for i, dir := range dirs { + tables[i] = strings.TrimSuffix(string(dir), "/") + } + + for _, tableName := range tables { + table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient) + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "failed to initialize table for compaction", "err", err) + continue + } + + err = table.compact() + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "failed to compact files", "err", err) + } + + // check if context was cancelled before going for next table. + select { + case <-ctx.Done(): + return nil + default: + } + } + + return nil +} diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go new file mode 100644 index 000000000000..cecabf64dd61 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -0,0 +1,334 @@ +package compactor + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "go.etcd.io/bbolt" + + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" +) + +const ( + compactMinDBs = 4 + uploaderName = "compactor" + + readDBsParallelism = 50 + batchSize = 1000 +) + +var bucketName = []byte("index") + +type indexEntry struct { + k, v []byte +} + +type table struct { + name string + workingDirectory string + storageClient chunk.ObjectClient + + compactedDB *bbolt.DB + + ctx context.Context + quit chan struct{} +} + +func newTable(ctx context.Context, workingDirectory string, objectClient chunk.ObjectClient) (*table, error) { + err := chunk_util.EnsureDirectory(workingDirectory) + if err != nil { + return nil, err + } + + table := table{ + ctx: ctx, + name: filepath.Base(workingDirectory), + workingDirectory: workingDirectory, + storageClient: objectClient, + quit: make(chan struct{}), + } + + return &table, nil +} + +func (t *table) compact() error { + objects, _, err := t.storageClient.List(t.ctx, t.name) + if err != nil { + return err + } + + level.Info(util.Logger).Log("msg", "listed files", "count", len(objects)) + + if len(objects) < compactMinDBs { + level.Info(util.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects))) + return nil + } + + defer func() { + err := t.cleanup() + if err != nil { + level.Error(util.Logger).Log("msg", "failed to cleanup table", "name", t.name) + } + }() + + t.compactedDB, err = local.OpenBoltdbFile(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))) + if err != nil { + return err + } + + level.Info(util.Logger).Log("msg", "starting compaction of dbs") + + errChan := make(chan error) + readObjectChan := make(chan string) + n := util.Min(len(objects), readDBsParallelism) + + // read files parallely + for i := 0; i < n; i++ { + go func() { + var err error + defer func() { + errChan <- err + }() + + for { + select { + case objectKey, ok := <-readObjectChan: + if !ok { + return + } + + var dbName string + dbName, err = shipper_util.GetDBNameFromObjectKey(objectKey) + if err != nil { + return + } + + downloadAt := filepath.Join(t.workingDirectory, dbName) + + err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objectKey, downloadAt) + if err != nil { + return + } + + err = t.readFile(downloadAt) + if err != nil { + level.Error(util.Logger).Log("msg", "error reading file", "err", err) + return + } + case <-t.quit: + return + case <-t.ctx.Done(): + return + } + } + + }() + } + + // send all files to readObjectChan + go func() { + for _, object := range objects { + select { + case readObjectChan <- object.Key: + case <-t.quit: + break + case <-t.ctx.Done(): + break + } + } + + level.Debug(util.Logger).Log("msg", "closing readObjectChan") + + close(readObjectChan) + }() + + var firstErr error + + // read all the errors + for i := 0; i < n; i++ { + select { + case err := <-errChan: + if err != nil && firstErr == nil { + firstErr = err + close(t.quit) + } + } + } + + if firstErr != nil { + return firstErr + } + + // check whether we stopped compaction due to context being cancelled. + select { + case <-t.ctx.Done(): + return nil + default: + } + + level.Info(util.Logger).Log("msg", "finished compacting the dbs") + + // upload the compacted db + err = t.upload() + if err != nil { + return err + } + + // remove source files from storage which were compacted + return t.removeObjectsFromStorage(objects) +} + +func (t *table) cleanup() error { + if t.compactedDB != nil { + err := t.compactedDB.Close() + if err != nil { + return err + } + } + + return os.RemoveAll(t.workingDirectory) +} + +// writeBatch writes a batch to compactedDB +func (t *table) writeBatch(batch []indexEntry) error { + return t.compactedDB.Batch(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists(bucketName) + if err != nil { + return err + } + + for _, w := range batch { + err = b.Put(w.k, w.v) + if err != nil { + return err + } + } + + return nil + }) +} + +// readFile reads a boltdb file from a path and writes the index in batched mode to compactedDB +func (t *table) readFile(path string) error { + level.Debug(util.Logger).Log("msg", "reading file for compaction", "path", path) + + db, err := local.OpenBoltdbFile(path) + if err != nil { + return err + } + + defer func() { + if err := db.Close(); err != nil { + level.Error(util.Logger).Log("msg", "failed to close db", "path", path, "err", err) + } + + if err = os.Remove(path); err != nil { + level.Error(util.Logger).Log("msg", "failed to remove file", "path", path, "err", err) + } + }() + + writeBatch := make([]indexEntry, 0, batchSize) + + return db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucketName) + if b == nil { + return errors.New("bucket not found") + } + + err := b.ForEach(func(k, v []byte) error { + ie := indexEntry{ + k: make([]byte, len(k)), + v: make([]byte, len(v)), + } + + // make a copy since k, v are only valid for the life of the transaction. + // See: https://godoc.org/github.com/boltdb/bolt#Cursor.Seek + copy(ie.k, k) + copy(ie.v, v) + + writeBatch = append(writeBatch, ie) + + if len(writeBatch) == cap(writeBatch) { + // batch is full, write the batch and create a new one. + err := t.writeBatch(writeBatch) + if err != nil { + return err + } + + writeBatch = make([]indexEntry, 0, batchSize) + } + + return nil + }) + if err != nil { + return err + } + + // write the remaining batch which might have been left unwritten due to it not being full yet. + return t.writeBatch(writeBatch) + }) +} + +// upload uploads the compacted db in compressed format. +func (t *table) upload() error { + compactedDBPath := t.compactedDB.Path() + + // close the compactedDB to make sure all the writes are processed. + err := t.compactedDB.Close() + if err != nil { + return err + } + + t.compactedDB = nil + + // compress the compactedDB. + compressedDBPath := fmt.Sprintf("%s.gz", compactedDBPath) + err = shipper_util.CompressFile(compactedDBPath, compressedDBPath) + if err != nil { + return err + } + + // open the file for reading. + compressedDB, err := os.Open(compressedDBPath) + if err != nil { + return err + } + + defer func() { + if err := compressedDB.Close(); err != nil { + level.Error(util.Logger).Log("msg", "failed to close file", "path", compactedDBPath, "err", err) + } + + if err := os.Remove(compressedDBPath); err != nil { + level.Error(util.Logger).Log("msg", "failed to remove file", "path", compressedDBPath, "err", err) + } + }() + + objectKey := fmt.Sprintf("%s.gz", shipper_util.BuildObjectKey(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) + level.Info(util.Logger).Log("msg", "uploading the compacted file", "objectKey", objectKey) + + return t.storageClient.PutObject(t.ctx, objectKey, compressedDB) +} + +// removeObjectsFromStorage deletes objects from storage. +func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error { + level.Info(util.Logger).Log("msg", "removing source db files from storage", "count", len(objects)) + + for _, object := range objects { + err := t.storageClient.DeleteObject(t.ctx, object.Key) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go new file mode 100644 index 000000000000..19f037e26d72 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -0,0 +1,181 @@ +package compactor + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + + "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" +) + +const ( + objectsStorageDirName = "objects" + workingDirName = "working-dir" +) + +func TestTable_Compaction(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-compaction") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + tableName := "test" + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + tablePathInStorage := filepath.Join(objectStoragePath, tableName) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) + + // setup some dbs + numDBs := compactMinDBs * 2 + numRecordsPerDB := 100 + + dbsToSetup := make(map[string]testutil.DBRecords) + for i := 0; i < numDBs; i++ { + dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{ + Start: i * numRecordsPerDB, + NumRecords: (i + 1) * numRecordsPerDB, + } + } + + testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true) + + // setup exact same copy of dbs for comparison. + testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false) + + // do the compaction + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + table, err := newTable(context.Background(), tableWorkingDirectory, objectClient) + require.NoError(t, err) + + require.NoError(t, table.compact()) + + // verify that we have only 1 file left in storage after compaction. + files, err := ioutil.ReadDir(tablePathInStorage) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + + // verify we have all the kvs in compacted db which were there in source dbs. + compareCompactedDB(t, filepath.Join(tablePathInStorage, files[0].Name()), filepath.Join(objectStoragePath, "test-copy")) +} + +func TestTable_CompactionFailure(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-compaction-failure") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + tableName := "test" + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + tablePathInStorage := filepath.Join(objectStoragePath, tableName) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) + + // setup some dbs + numDBs := compactMinDBs * 2 + numRecordsPerDB := 100 + + dbsToSetup := make(map[string]testutil.DBRecords) + for i := 0; i < numDBs; i++ { + dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{ + Start: i * numRecordsPerDB, + NumRecords: (i + 1) * numRecordsPerDB, + } + } + + testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true) + + // put a non-boltdb file in the table which should cause the compaction to fail in the middle because it would fail to open that file with boltdb client. + require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, "fail.txt"), []byte("fail the compaction"), 0666)) + + // do the compaction + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + table, err := newTable(context.Background(), tableWorkingDirectory, objectClient) + require.NoError(t, err) + + // compaction should fail due to a non-boltdb file. + require.Error(t, table.compact()) + + // ensure that files in storage are intact. + files, err := ioutil.ReadDir(tablePathInStorage) + require.NoError(t, err) + require.Len(t, files, numDBs+1) + + // ensure that we have cleanup the local working directory after failing the compaction. + require.NoFileExists(t, tableWorkingDirectory) + + // remove the non-boltdb file and ensure that compaction succeeds now. + require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, "fail.txt"))) + + table, err = newTable(context.Background(), tableWorkingDirectory, objectClient) + require.NoError(t, err) + require.NoError(t, table.compact()) + + // ensure that we have cleanup the local working directory after successful compaction. + require.NoFileExists(t, tableWorkingDirectory) +} + +func compareCompactedDB(t *testing.T, compactedDBPath string, sourceDBsPath string) { + tempDir, err := ioutil.TempDir("", "compare-compacted-db") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + decompressedFilePath := filepath.Join(tempDir, filepath.Base(compactedDBPath)) + testutil.DecompressFile(t, compactedDBPath, decompressedFilePath) + + compactedDB, err := local.OpenBoltdbFile(decompressedFilePath) + require.NoError(t, err) + + defer func() { + require.NoError(t, compactedDB.Close()) + }() + + sourceFiles, err := ioutil.ReadDir(sourceDBsPath) + require.NoError(t, err) + + err = compactedDB.View(func(tx *bbolt.Tx) error { + compactedBucket := tx.Bucket(bucketName) + require.NotNil(t, compactedBucket) + + for _, file := range sourceFiles { + srcDB, err := local.OpenBoltdbFile(filepath.Join(sourceDBsPath, file.Name())) + require.NoError(t, err) + + err = srcDB.View(func(tx *bbolt.Tx) error { + srcBucket := tx.Bucket(bucketName) + require.NotNil(t, srcBucket) + + return srcBucket.ForEach(func(k, v []byte) error { + val := compactedBucket.Get(k) + require.NotNil(t, val) + require.Equal(t, v, val) + return nil + }) + + }) + require.NoError(t, err) + + require.NoError(t, srcDB.Close()) + } + return nil + }) + + require.NoError(t, err) +} diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 2bbb94128423..5c80443b2cc9 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -18,7 +18,7 @@ import ( "github.com/go-kit/kit/log/level" "go.etcd.io/bbolt" - "github.com/grafana/loki/pkg/chunkenc" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) // timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time. @@ -341,7 +341,7 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj // download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", dbName, "temp")) - err = t.getFileFromStorage(ctx, storageObject.Key, tempFilePath) + err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, tempFilePath) if err != nil { return err } @@ -375,42 +375,6 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj return nil } -// getFileFromStorage downloads a file from storage to given location. -func (t *Table) getFileFromStorage(ctx context.Context, objectKey, destination string) error { - readCloser, err := t.storageClient.GetObject(ctx, objectKey) - if err != nil { - return err - } - - defer func() { - if err := readCloser.Close(); err != nil { - level.Error(util.Logger) - } - }() - - f, err := os.Create(destination) - if err != nil { - return err - } - - var objectReader io.Reader = readCloser - if strings.HasSuffix(objectKey, ".gz") { - decompressedReader := chunkenc.Gzip.GetReader(readCloser) - defer chunkenc.Gzip.PutReader(decompressedReader) - - objectReader = decompressedReader - } - - _, err = io.Copy(f, objectReader) - if err != nil { - return err - } - - level.Info(util.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey)) - - return f.Sync() -} - func (t *Table) folderPathForTable(ensureExists bool) (string, error) { folderPath := path.Join(t.cacheLocation, t.name) @@ -463,7 +427,7 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO } filePath := path.Join(folderPathForTable, dbName) - err = t.getFileFromStorage(ctx, object.Key, filePath) + err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath) if err != nil { break } diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index 718a6deac922..c12f98cea89b 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -38,7 +38,7 @@ const ( // FilesystemObjectStoreType holds the periodic config type for the filesystem store FilesystemObjectStoreType = "filesystem" - storageKeyPrefix = "index/" + StorageKeyPrefix = "index/" // UploadInterval defines interval for uploading active boltdb files from local which are being written to by ingesters. UploadInterval = 15 * time.Minute @@ -112,7 +112,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R return err } - prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, storageKeyPrefix) + prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, StorageKeyPrefix) if s.cfg.Mode != ModeReadOnly { cfg := uploads.Config{ diff --git a/pkg/storage/stores/shipper/table_client.go b/pkg/storage/stores/shipper/table_client.go index ba0fdee8002c..500fe4630cb6 100644 --- a/pkg/storage/stores/shipper/table_client.go +++ b/pkg/storage/stores/shipper/table_client.go @@ -18,7 +18,7 @@ type boltDBShipperTableClient struct { } func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient) chunk.TableClient { - return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, storageKeyPrefix)} + return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix)} } func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) { diff --git a/pkg/storage/stores/shipper/table_client_test.go b/pkg/storage/stores/shipper/table_client_test.go index 938b2c72db5f..04454c1e6197 100644 --- a/pkg/storage/stores/shipper/table_client_test.go +++ b/pkg/storage/stores/shipper/table_client_test.go @@ -36,7 +36,7 @@ func TestBoltDBShipperTableClient(t *testing.T) { } // we need to use prefixed object client while creating files/folder - prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, storageKeyPrefix) + prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix) for folder, files := range foldersWithFiles { for _, fileName := range files { diff --git a/pkg/storage/stores/shipper/testutil/testutil.go b/pkg/storage/stores/shipper/testutil/testutil.go index c6e8bffff54d..9d484d73775b 100644 --- a/pkg/storage/stores/shipper/testutil/testutil.go +++ b/pkg/storage/stores/shipper/testutil/testutil.go @@ -173,3 +173,24 @@ func compressFile(t *testing.T, filepath string) { require.NoError(t, compressedFile.Close()) require.NoError(t, os.Remove(filepath)) } + +func DecompressFile(t *testing.T, src, dest string) { + // open compressed file from storage + compressedFile, err := os.Open(src) + require.NoError(t, err) + + // get a compressed reader + compressedReader, err := gzip.NewReader(compressedFile) + require.NoError(t, err) + + decompressedFile, err := os.Create(dest) + require.NoError(t, err) + + // do the decompression + _, err = io.Copy(decompressedFile, compressedReader) + require.NoError(t, err) + + // close the references + require.NoError(t, compressedFile.Close()) + require.NoError(t, decompressedFile.Close()) +} diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go index 7c7126bb8b5e..4aaac1649c07 100644 --- a/pkg/storage/stores/shipper/uploads/table_test.go +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -346,7 +346,7 @@ func Test_LoadBoltDBsFromDir(t *testing.T) { Start: 10, NumRecords: 10, }, - }) + }, false) // try loading the dbs dbs, err := loadBoltDBsFromDir(tablePath) @@ -395,7 +395,7 @@ func TestTable_ImmutableUploads(t *testing.T) { } // setup some dbs for a table at a path. - tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, dbs) + tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, dbs, false) table, err := LoadTable(tablePath, "test", storageClient, boltDBIndexClient) require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go new file mode 100644 index 000000000000..e6e374d5c27a --- /dev/null +++ b/pkg/storage/stores/shipper/util/util.go @@ -0,0 +1,118 @@ +package util + +import ( + "context" + "fmt" + "io" + "os" + "strings" + + "github.com/grafana/loki/pkg/chunkenc" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" +) + +type StorageClient interface { + GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) +} + +// GetFileFromStorage downloads a file from storage to given location. +func GetFileFromStorage(ctx context.Context, storageClient StorageClient, objectKey, destination string) error { + readCloser, err := storageClient.GetObject(ctx, objectKey) + if err != nil { + return err + } + + defer func() { + if err := readCloser.Close(); err != nil { + level.Error(util.Logger) + } + }() + + f, err := os.Create(destination) + if err != nil { + return err + } + + var objectReader io.Reader = readCloser + if strings.HasSuffix(objectKey, ".gz") { + decompressedReader := chunkenc.Gzip.GetReader(readCloser) + defer chunkenc.Gzip.PutReader(decompressedReader) + + objectReader = decompressedReader + } + + _, err = io.Copy(f, objectReader) + if err != nil { + return err + } + + level.Info(util.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey)) + + return f.Sync() +} + +func GetDBNameFromObjectKey(objectKey string) (string, error) { + ss := strings.Split(objectKey, "/") + + if len(ss) != 2 { + return "", fmt.Errorf("invalid object key: %v", objectKey) + } + if ss[1] == "" { + return "", fmt.Errorf("empty db name, object key: %v", objectKey) + } + return ss[1], nil +} + +func BuildObjectKey(tableName, uploader, dbName string) string { + // Files are stored with /- + objectKey := fmt.Sprintf("%s/%s-%s", tableName, uploader, dbName) + + // if the file is a migrated one then don't add its name to the object key otherwise we would re-upload them again here with a different name. + if tableName == dbName { + objectKey = fmt.Sprintf("%s/%s", tableName, uploader) + } + + return objectKey +} + +func CompressFile(src, dest string) error { + level.Info(util.Logger).Log("msg", "compressing the file", "src", src, "dest", dest) + uncompressedFile, err := os.Open(src) + if err != nil { + return err + } + + defer func() { + if err := uncompressedFile.Close(); err != nil { + level.Error(util.Logger).Log("msg", "failed to close uncompressed file", "path", src, "err", err) + } + }() + + compressedFile, err := os.Create(dest) + if err != nil { + return err + } + + defer func() { + if err := uncompressedFile.Close(); err != nil { + level.Error(util.Logger).Log("msg", "failed to close compressed file", "path", dest, "err", err) + } + }() + + compressedWriter := chunkenc.Gzip.GetWriter(compressedFile) + defer chunkenc.Gzip.PutWriter(compressedWriter) + + _, err = io.Copy(compressedWriter, uncompressedFile) + if err != nil { + return err + } + + err = compressedWriter.Close() + if err == nil { + return err + } + + return compressedFile.Sync() +} diff --git a/pkg/storage/stores/shipper/util/util_test.go b/pkg/storage/stores/shipper/util/util_test.go new file mode 100644 index 000000000000..9aaeb3c1e325 --- /dev/null +++ b/pkg/storage/stores/shipper/util/util_test.go @@ -0,0 +1,77 @@ +package util + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" + "github.com/stretchr/testify/require" +) + +func Test_GetFileFromStorage(t *testing.T) { + tempDir, err := ioutil.TempDir("", "get-file-from-storage") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + // write a file to storage. + testData := []byte("test-data") + require.NoError(t, ioutil.WriteFile(filepath.Join(tempDir, "src"), testData, 0666)) + + // try downloading the file from the storage. + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) + require.NoError(t, err) + + require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src", filepath.Join(tempDir, "dest"))) + + // verify the contents of the downloaded file. + b, err := ioutil.ReadFile(filepath.Join(tempDir, "dest")) + require.NoError(t, err) + + require.Equal(t, testData, b) + + // compress the file in storage + err = CompressFile(filepath.Join(tempDir, "src"), filepath.Join(tempDir, "src.gz")) + require.NoError(t, err) + + // get the compressed file from storage + require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src.gz", filepath.Join(tempDir, "dest.gz"))) + + // verify the contents of the downloaded gz file. + b, err = ioutil.ReadFile(filepath.Join(tempDir, "dest.gz")) + require.NoError(t, err) + + require.Equal(t, testData, b) +} + +func Test_CompressFile(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-compaction") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + uncompressedFilePath := filepath.Join(tempDir, "test-file") + compressedFilePath := filepath.Join(tempDir, "test-file.gz") + decompressedFilePath := filepath.Join(tempDir, "test-file-decompressed") + + testData := []byte("test-data") + + require.NoError(t, ioutil.WriteFile(uncompressedFilePath, testData, 0666)) + + require.NoError(t, CompressFile(uncompressedFilePath, compressedFilePath)) + require.FileExists(t, compressedFilePath) + + testutil.DecompressFile(t, compressedFilePath, decompressedFilePath) + b, err := ioutil.ReadFile(decompressedFilePath) + require.NoError(t, err) + + require.Equal(t, testData, b) +}