diff --git a/docs/operations/storage/boltdb-shipper.md b/docs/operations/storage/boltdb-shipper.md index f6e70d7d217a..56911c179e94 100644 --- a/docs/operations/storage/boltdb-shipper.md +++ b/docs/operations/storage/boltdb-shipper.md @@ -26,8 +26,9 @@ storage_config: gcs: bucket_name: GCS_BUCKET_NAME - boltdb_shipper_config: + boltdb_shipper: active_index_directory: /loki/index + shared_store: gcs cache_location: /loki/boltdb-cache ``` diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index b7569c4eebe9..6700dd3bc662 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -12,6 +12,7 @@ import ( "github.com/fatih/color" json "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/user" @@ -148,7 +149,7 @@ func localStore(conf loki.Config) (logql.Querier, error) { if err != nil { return nil, err } - s, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits) + s, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index adfbaee5e2a1..263a769848a7 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -266,7 +266,7 @@ func (t *Loki) initStore() (_ services.Service, err error) { } } - t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides) + t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer) if err != nil { return } diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index c92924675bc8..02afd3544f23 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" @@ -64,6 +65,7 @@ func getStore() (lstore.Store, error) { }, }, &validation.Overrides{}, + prometheus.DefaultRegisterer, ) if err != nil { return nil, err diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 93a805d88c6e..00c78467933d 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -26,7 +26,7 @@ import ( type Config struct { storage.Config `yaml:",inline"` MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` - BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper_config"` + BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper"` } // RegisterFlags adds the flags required to configure this flag set. @@ -49,10 +49,10 @@ type store struct { } // NewStore creates a new Loki Store using configuration supplied. -func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits) (Store, error) { - registerCustomIndexClients(cfg, schemaCfg) +func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits, registerer prometheus.Registerer) (Store, error) { + registerCustomIndexClients(cfg) - s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, prometheus.DefaultRegisterer) + s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer) if err != nil { return nil, err } @@ -218,35 +218,23 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk. return filtered } -func registerCustomIndexClients(cfg Config, schemaCfg chunk.SchemaConfig) { - boltdbShipperInstances := 0 +func registerCustomIndexClients(cfg Config) { + // BoltDB Shipper is supposed to be run as a singleton. + // This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used + // in tests for creating multiple instances of it at a time. + var boltDBIndexClientWithShipper chunk.IndexClient + storage.RegisterIndexStore(local.BoltDBShipperType, func() (chunk.IndexClient, error) { - // since we do not know which object client is being used for the period for which we are creating this index client, - // we need to iterate through all the periodic configs to find the right one. - // We maintain number of instances that we have already created in boltdbShipperInstances and then count the number of - // encounters of BoltDBShipperType until we find the right periodic config for getting the ObjectType. - // This is done assuming we are creating index client in the order of periodic configs. - // Note: We are assuming that user would never store chunks in table based store otherwise NewObjectClient would return an error. - - // ToDo: Try passing on ObjectType from Cortex to the callback for creating custom index client. - boltdbShipperEncounter := 0 - objectStoreType := "" - for _, config := range schemaCfg.Configs { - if config.IndexType == local.BoltDBShipperType { - boltdbShipperEncounter++ - if boltdbShipperEncounter > boltdbShipperInstances { - objectStoreType = config.ObjectType - break - } - } + if boltDBIndexClientWithShipper != nil { + return boltDBIndexClientWithShipper, nil } - boltdbShipperInstances++ - objectClient, err := storage.NewObjectClient(objectStoreType, cfg.Config) + objectClient, err := storage.NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg.Config) if err != nil { return nil, err } - return local.NewBoltDBIndexClient(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig) + boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig) + return boltDBIndexClientWithShipper, err }, nil) } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 03a2953301e2..3641055aa588 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2,9 +2,12 @@ package storage import ( "context" + "io/ioutil" "log" "net/http" _ "net/http/pprof" + "os" + "path" "runtime" "testing" "time" @@ -14,13 +17,15 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/local" + cortex_local "github.com/cortexproject/cortex/pkg/chunk/local" "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/marshal" + "github.com/grafana/loki/pkg/storage/stores/local" "github.com/grafana/loki/pkg/util/validation" ) @@ -149,8 +154,8 @@ func getLocalStore() Store { } store, err := NewStore(Config{ Config: storage.Config{ - BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"}, - FSConfig: local.FSConfig{Directory: "/tmp/benchmark/chunks"}, + BoltDBConfig: cortex_local.BoltDBConfig{Directory: "/tmp/benchmark/index"}, + FSConfig: cortex_local.FSConfig{Directory: "/tmp/benchmark/chunks"}, }, MaxChunkBatchSize: 10, }, chunk.StoreConfig{}, chunk.SchemaConfig{ @@ -166,7 +171,7 @@ func getLocalStore() Store { }, }, }, - }, limits) + }, limits, nil) if err != nil { panic(err) } @@ -422,6 +427,108 @@ func Test_store_GetSeries(t *testing.T) { } } +type timeRange struct { + from, to time.Time +} + +func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { + tempDir, err := ioutil.TempDir("", "multiple-boltdb-shippers") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + limits, err := validation.NewOverrides(validation.Limits{}, nil) + require.NoError(t, err) + + // config for BoltDB Shipper + boltdbShipperConfig := local.ShipperConfig{} + flagext.DefaultValues(&boltdbShipperConfig) + boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index") + boltdbShipperConfig.SharedStoreType = "filesystem" + boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache") + + // dates for activation of boltdb shippers + firstStoreDate := parseDate("2019-01-01") + secondStoreDate := parseDate("2019-01-02") + + store, err := NewStore(Config{ + Config: storage.Config{ + FSConfig: cortex_local.FSConfig{Directory: path.Join(tempDir, "chunks")}, + }, + BoltDBShipperConfig: boltdbShipperConfig, + }, chunk.StoreConfig{}, chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: timeToModelTime(firstStoreDate)}, + IndexType: "boltdb-shipper", + ObjectType: "filesystem", + Schema: "v9", + IndexTables: chunk.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 168, + }, + }, + { + From: chunk.DayTime{Time: timeToModelTime(secondStoreDate)}, + IndexType: "boltdb-shipper", + ObjectType: "filesystem", + Schema: "v11", + IndexTables: chunk.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 168, + }, + RowShards: 2, + }, + }, + }, limits, nil) + require.NoError(t, err) + + // time ranges adding a chunk for each store and a chunk which overlaps both the stores + chunksToBuildForTimeRanges := []timeRange{ + { + // chunk just for first store + secondStoreDate.Add(-3 * time.Hour), + secondStoreDate.Add(-2 * time.Hour), + }, + { + // chunk overlapping both the stores + secondStoreDate.Add(-time.Hour), + secondStoreDate.Add(time.Hour), + }, + { + // chunk just for second store + secondStoreDate.Add(2 * time.Hour), + secondStoreDate.Add(3 * time.Hour), + }, + } + + // build and add chunks to the store + addedChunkIDs := map[string]struct{}{} + for _, tr := range chunksToBuildForTimeRanges { + chk := newChunk(buildTestStreams(fooLabelsWithName, tr)) + + err := store.PutOne(ctx, chk.From, chk.Through, chk) + require.NoError(t, err) + + addedChunkIDs[chk.ExternalKey()] = struct{}{} + } + + // get all the chunks from both the stores + chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName)...) + require.NoError(t, err) + + // we get common chunk twice because it is indexed in both the stores + require.Len(t, chunks, len(addedChunkIDs)+1) + + // check whether we got back all the chunks which were added + for i := range chunks { + _, ok := addedChunkIDs[chunks[i].ExternalKey()] + require.True(t, ok) + } +} + func mustParseLabels(s string) map[string]string { l, err := marshal.NewLabelSet(s) @@ -431,3 +538,31 @@ func mustParseLabels(s string) map[string]string { return l } + +func parseDate(in string) time.Time { + t, err := time.Parse("2006-01-02", in) + if err != nil { + panic(err) + } + return t +} + +func buildTestStreams(labels string, tr timeRange) logproto.Stream { + stream := logproto.Stream{ + Labels: labels, + Entries: []logproto.Entry{}, + } + + for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) { + stream.Entries = append(stream.Entries, logproto.Entry{ + Timestamp: from, + Line: from.String(), + }) + } + + return stream +} + +func timeToModelTime(t time.Time) model.Time { + return model.TimeFromUnixNano(t.UnixNano()) +} diff --git a/pkg/storage/stores/local/boltdb_index_client.go b/pkg/storage/stores/local/boltdb_index_client.go index 2d1c80b3c8e1..ef60ff4e5726 100644 --- a/pkg/storage/stores/local/boltdb_index_client.go +++ b/pkg/storage/stores/local/boltdb_index_client.go @@ -14,8 +14,8 @@ type BoltdbIndexClientWithShipper struct { shipper *Shipper } -// NewBoltDBIndexClient creates a new IndexClient that used BoltDB. -func NewBoltDBIndexClient(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) { +// NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB. +func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) { boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg) if err != nil { return nil, err diff --git a/pkg/storage/stores/local/shipper.go b/pkg/storage/stores/local/shipper.go index 59f72ef1f64a..1856e3698a6e 100644 --- a/pkg/storage/stores/local/shipper.go +++ b/pkg/storage/stores/local/shipper.go @@ -43,6 +43,7 @@ type BoltDBGetter interface { type ShipperConfig struct { ActiveIndexDirectory string `yaml:"active_index_directory"` + SharedStoreType string `yaml:"shared_store"` CacheLocation string `yaml:"cache_location"` CacheTTL time.Duration `yaml:"cache_ttl"` ResyncInterval time.Duration `yaml:"resync_interval"` @@ -53,6 +54,7 @@ type ShipperConfig struct { // RegisterFlags registers flags. func (cfg *ShipperConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage") + f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem") f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries") f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") diff --git a/pkg/storage/stores/local/uploads_test.go b/pkg/storage/stores/local/uploads_test.go index 4a71da5194b7..40404ae2773c 100644 --- a/pkg/storage/stores/local/uploads_test.go +++ b/pkg/storage/stores/local/uploads_test.go @@ -38,7 +38,7 @@ func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, loca }) require.NoError(t, err) - boltdbIndexClientWithShipper, err := NewBoltDBIndexClient(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig) + boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig) require.NoError(t, err) return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper)