Skip to content

Commit

Permalink
make boltdb shipper singleton and some other minor refactoring (#1995)
Browse files Browse the repository at this point in the history
* make boltdb shipper singleton and some other minor refactoring
  • Loading branch information
sandeepsukhani authored Apr 27, 2020
1 parent 47b79f3 commit 21d4ca4
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 37 deletions.
3 changes: 2 additions & 1 deletion docs/operations/storage/boltdb-shipper.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ storage_config:
gcs:
bucket_name: GCS_BUCKET_NAME

boltdb_shipper_config:
boltdb_shipper:
active_index_directory: /loki/index
shared_store: gcs
cache_location: /loki/boltdb-cache
```
Expand Down
3 changes: 2 additions & 1 deletion pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/fatih/color"
json "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/user"

Expand Down Expand Up @@ -148,7 +149,7 @@ func localStore(conf loki.Config) (logql.Querier, error) {
if err != nil {
return nil, err
}
s, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits)
s, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}

t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/hack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -64,6 +65,7 @@ func getStore() (lstore.Store, error) {
},
},
&validation.Overrides{},
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
Expand Down
42 changes: 15 additions & 27 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
type Config struct {
storage.Config `yaml:",inline"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper_config"`
BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper"`
}

// RegisterFlags adds the flags required to configure this flag set.
Expand All @@ -49,10 +49,10 @@ type store struct {
}

// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits) (Store, error) {
registerCustomIndexClients(cfg, schemaCfg)
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits, registerer prometheus.Registerer) (Store, error) {
registerCustomIndexClients(cfg)

s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, prometheus.DefaultRegisterer)
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -218,35 +218,23 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
return filtered
}

func registerCustomIndexClients(cfg Config, schemaCfg chunk.SchemaConfig) {
boltdbShipperInstances := 0
func registerCustomIndexClients(cfg Config) {
// BoltDB Shipper is supposed to be run as a singleton.
// This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used
// in tests for creating multiple instances of it at a time.
var boltDBIndexClientWithShipper chunk.IndexClient

storage.RegisterIndexStore(local.BoltDBShipperType, func() (chunk.IndexClient, error) {
// since we do not know which object client is being used for the period for which we are creating this index client,
// we need to iterate through all the periodic configs to find the right one.
// We maintain number of instances that we have already created in boltdbShipperInstances and then count the number of
// encounters of BoltDBShipperType until we find the right periodic config for getting the ObjectType.
// This is done assuming we are creating index client in the order of periodic configs.
// Note: We are assuming that user would never store chunks in table based store otherwise NewObjectClient would return an error.

// ToDo: Try passing on ObjectType from Cortex to the callback for creating custom index client.
boltdbShipperEncounter := 0
objectStoreType := ""
for _, config := range schemaCfg.Configs {
if config.IndexType == local.BoltDBShipperType {
boltdbShipperEncounter++
if boltdbShipperEncounter > boltdbShipperInstances {
objectStoreType = config.ObjectType
break
}
}
if boltDBIndexClientWithShipper != nil {
return boltDBIndexClientWithShipper, nil
}

boltdbShipperInstances++
objectClient, err := storage.NewObjectClient(objectStoreType, cfg.Config)
objectClient, err := storage.NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg.Config)
if err != nil {
return nil, err
}

return local.NewBoltDBIndexClient(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig)
boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig)
return boltDBIndexClientWithShipper, err
}, nil)
}
143 changes: 139 additions & 4 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package storage

import (
"context"
"io/ioutil"
"log"
"net/http"
_ "net/http/pprof"
"os"
"path"
"runtime"
"testing"
"time"
Expand All @@ -14,13 +17,15 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
cortex_local "github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/util/flagext"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
"github.com/grafana/loki/pkg/storage/stores/local"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -149,8 +154,8 @@ func getLocalStore() Store {
}
store, err := NewStore(Config{
Config: storage.Config{
BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
FSConfig: local.FSConfig{Directory: "/tmp/benchmark/chunks"},
BoltDBConfig: cortex_local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
FSConfig: cortex_local.FSConfig{Directory: "/tmp/benchmark/chunks"},
},
MaxChunkBatchSize: 10,
}, chunk.StoreConfig{}, chunk.SchemaConfig{
Expand All @@ -166,7 +171,7 @@ func getLocalStore() Store {
},
},
},
}, limits)
}, limits, nil)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -422,6 +427,108 @@ func Test_store_GetSeries(t *testing.T) {
}
}

type timeRange struct {
from, to time.Time
}

func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
tempDir, err := ioutil.TempDir("", "multiple-boltdb-shippers")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()

limits, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)

// config for BoltDB Shipper
boltdbShipperConfig := local.ShipperConfig{}
flagext.DefaultValues(&boltdbShipperConfig)
boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
boltdbShipperConfig.SharedStoreType = "filesystem"
boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache")

// dates for activation of boltdb shippers
firstStoreDate := parseDate("2019-01-01")
secondStoreDate := parseDate("2019-01-02")

store, err := NewStore(Config{
Config: storage.Config{
FSConfig: cortex_local.FSConfig{Directory: path.Join(tempDir, "chunks")},
},
BoltDBShipperConfig: boltdbShipperConfig,
}, chunk.StoreConfig{}, chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: timeToModelTime(firstStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: "filesystem",
Schema: "v9",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
},
{
From: chunk.DayTime{Time: timeToModelTime(secondStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: "filesystem",
Schema: "v11",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
RowShards: 2,
},
},
}, limits, nil)
require.NoError(t, err)

// time ranges adding a chunk for each store and a chunk which overlaps both the stores
chunksToBuildForTimeRanges := []timeRange{
{
// chunk just for first store
secondStoreDate.Add(-3 * time.Hour),
secondStoreDate.Add(-2 * time.Hour),
},
{
// chunk overlapping both the stores
secondStoreDate.Add(-time.Hour),
secondStoreDate.Add(time.Hour),
},
{
// chunk just for second store
secondStoreDate.Add(2 * time.Hour),
secondStoreDate.Add(3 * time.Hour),
},
}

// build and add chunks to the store
addedChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))

err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)

addedChunkIDs[chk.ExternalKey()] = struct{}{}
}

// get all the chunks from both the stores
chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName)...)
require.NoError(t, err)

// we get common chunk twice because it is indexed in both the stores
require.Len(t, chunks, len(addedChunkIDs)+1)

// check whether we got back all the chunks which were added
for i := range chunks {
_, ok := addedChunkIDs[chunks[i].ExternalKey()]
require.True(t, ok)
}
}

func mustParseLabels(s string) map[string]string {
l, err := marshal.NewLabelSet(s)

Expand All @@ -431,3 +538,31 @@ func mustParseLabels(s string) map[string]string {

return l
}

func parseDate(in string) time.Time {
t, err := time.Parse("2006-01-02", in)
if err != nil {
panic(err)
}
return t
}

func buildTestStreams(labels string, tr timeRange) logproto.Stream {
stream := logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{},
}

for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) {
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: from,
Line: from.String(),
})
}

return stream
}

func timeToModelTime(t time.Time) model.Time {
return model.TimeFromUnixNano(t.UnixNano())
}
4 changes: 2 additions & 2 deletions pkg/storage/stores/local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type BoltdbIndexClientWithShipper struct {
shipper *Shipper
}

// NewBoltDBIndexClient creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClient(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) {
// NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) {
boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/stores/local/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type BoltDBGetter interface {

type ShipperConfig struct {
ActiveIndexDirectory string `yaml:"active_index_directory"`
SharedStoreType string `yaml:"shared_store"`
CacheLocation string `yaml:"cache_location"`
CacheTTL time.Duration `yaml:"cache_ttl"`
ResyncInterval time.Duration `yaml:"resync_interval"`
Expand All @@ -53,6 +54,7 @@ type ShipperConfig struct {
// RegisterFlags registers flags.
func (cfg *ShipperConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem")
f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/local/uploads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, loca
})
require.NoError(t, err)

boltdbIndexClientWithShipper, err := NewBoltDBIndexClient(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig)
boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig)
require.NoError(t, err)

return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper)
Expand Down

0 comments on commit 21d4ca4

Please sign in to comment.