Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start refactor of storage (chunks) clients metrics into structs #5057

Merged
merged 8 commits into from
Jan 25, 2022
5 changes: 3 additions & 2 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func main() {
}
// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
sourceStore, err := chunk_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig.StoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
clientMetrics := chunk_storage.NewClientMetrics()
sourceStore, err := chunk_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig.StoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create source store:", err)
os.Exit(1)
Expand All @@ -105,7 +106,7 @@ func main() {

// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
destStore, err := chunk_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig.StoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
destStore, err := chunk_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig.StoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create destination store:", err)
os.Exit(1)
Expand Down
5 changes: 3 additions & 2 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
if err != nil {
return err
}
storage.RegisterCustomIndexClients(&conf.StorageConfig, prometheus.DefaultRegisterer)
cm := chunk_storage.NewClientMetrics()
storage.RegisterCustomIndexClients(&conf.StorageConfig, cm, prometheus.DefaultRegisterer)
conf.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
chunkStore, err := chunk_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
chunkStore, err := chunk_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, cm, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/tracing"
"github.com/grafana/loki/pkg/util/fakeauth"
Expand Down Expand Up @@ -264,21 +265,24 @@ type Loki struct {
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler

clientMetrics chunk_storage.ClientMetrics

HTTPAuthMiddleware middleware.Interface
}

// New makes a new Loki.
func New(cfg Config) (*Loki, error) {
loki := &Loki{
Cfg: cfg,
Cfg: cfg,
clientMetrics: chunk_storage.NewClientMetrics(),
}

loki.setupAuthMiddleware()
loki.setupGRPCRecoveryMiddleware()
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
storage.RegisterCustomIndexClients(&loki.Cfg.StorageConfig, prometheus.DefaultRegisterer)
storage.RegisterCustomIndexClients(&loki.Cfg.StorageConfig, loki.clientMetrics, prometheus.DefaultRegisterer)

return loki, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}

chunkStore, err := chunk_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger)
chunkStore, err := chunk_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return
}
Expand Down Expand Up @@ -601,7 +601,7 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
}
}

t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, ruler.GroupLoader{}, util_log.Logger)
t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger)

return
}
Expand Down Expand Up @@ -701,7 +701,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
if err != nil {
return nil, err
}
t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand All @@ -718,7 +718,7 @@ func (t *Loki) initCompactor() (services.Service, error) {

func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config)
objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config, t.clientMetrics)
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/ruler/base/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/util/test"
)

Expand All @@ -24,7 +25,9 @@ func TestRulerShutdown(t *testing.T) {
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
defer cleanup()

r, rcleanup := buildRuler(t, config, nil, nil)
m := storage.NewClientMetrics()
defer m.Unregister()
r, rcleanup := buildRuler(t, config, nil, m, nil)
defer rcleanup()

r.cfg.EnableSharding = true
Expand Down Expand Up @@ -59,7 +62,9 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
ctx := context.Background()
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
defer cleanup()
r, rcleanup := buildRuler(t, config, nil, nil)
m := storage.NewClientMetrics()
defer m.Unregister()
r, rcleanup := buildRuler(t, config, nil, m, nil)
defer rcleanup()
r.cfg.EnableSharding = true
r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond
Expand Down
22 changes: 15 additions & 7 deletions pkg/ruler/base/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/grafana/loki/pkg/ruler/rulestore/objectclient"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/tenant"
)

Expand Down Expand Up @@ -221,9 +222,9 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer
}
}

func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, rulerAddrMap map[string]*Ruler) (*Ruler, func()) {
func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, clientMetrics chunk_storage.ClientMetrics, rulerAddrMap map[string]*Ruler) (*Ruler, func()) {
engine, queryable, pusher, logger, overrides, reg, cleanup := testSetup(t, querierTestConfig)
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, promRules.FileLoader{}, log.NewNopLogger())
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
require.NoError(t, err)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg)
Expand All @@ -244,7 +245,9 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes
}

func newTestRuler(t *testing.T, rulerConfig Config) (*Ruler, func()) {
ruler, cleanup := buildRuler(t, rulerConfig, nil, nil)
m := chunk_storage.NewClientMetrics()
defer m.Unregister()
ruler, cleanup := buildRuler(t, rulerConfig, nil, m, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))

// Ensure all rules are loaded before usage
Expand Down Expand Up @@ -415,8 +418,9 @@ func TestGetRules(t *testing.T) {
Mock: kvStore,
},
}

r, cleanUp := buildRuler(t, cfg, nil, rulerAddrMap)
m := chunk_storage.NewClientMetrics()
defer m.Unregister()
r, cleanUp := buildRuler(t, cfg, nil, m, rulerAddrMap)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
t.Cleanup(cleanUp)
rulerAddrMap[id] = r
Expand Down Expand Up @@ -919,7 +923,9 @@ func TestSharding(t *testing.T) {
DisabledTenants: tc.disabledUsers,
}

r, cleanup := buildRuler(t, cfg, nil, nil)
m := chunk_storage.NewClientMetrics()
defer m.Unregister()
r, cleanup := buildRuler(t, cfg, nil, m, nil)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
t.Cleanup(cleanup)

Expand Down Expand Up @@ -1276,8 +1282,10 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
querier.UseAlwaysQueryable(querier.NewChunkStoreQueryable(querierConfig, &emptyChunkStore{})),
}

m := chunk_storage.NewClientMetrics()
defer m.Unregister()
// create a ruler but don't start it. instead, we'll evaluate the rule groups manually.
r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, nil)
r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, m, nil)
r.syncRules(context.Background(), rulerSyncReasonInitial)
defer rcleanup()

Expand Down
5 changes: 3 additions & 2 deletions pkg/ruler/base/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/storage/chunk/storage"
)

// RuleStoreConfig configures a rule store.
Expand Down Expand Up @@ -77,7 +78,7 @@ func (cfg *RuleStoreConfig) IsDefaults() bool {
// NewLegacyRuleStore returns a rule store backend client based on the provided cfg.
// The client used by the function is based a legacy object store clients that shouldn't
// be used anymore.
func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
if cfg.mock != nil {
return cfg.mock, nil
}
Expand All @@ -97,7 +98,7 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, loader pro
}
return configdb.NewConfigRuleStore(c), nil
case "azure":
client, err = azure.NewBlobStorage(&cfg.Azure, hedgeCfg)
client, err = azure.NewBlobStorage(&cfg.Azure, clientMetrics.AzureMetrics, hedgeCfg)
case "gcs":
client, err = gcp.NewGCSObjectClient(context.Background(), cfg.GCS, hedgeCfg)
case "s3":
Expand Down
72 changes: 44 additions & 28 deletions pkg/storage/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,6 @@ const (
azureUSGovernment = "AzureUSGovernment"
)

var requestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "azure_blob_request_duration_seconds",
Help: "Time spent doing azure blob requests.",
// Latency seems to range from a few ms to a few secs and is
// important. So use 6 buckets from 5ms to 5s.
Buckets: prometheus.ExponentialBuckets(0.005, 4, 6),
}, []string{"operation", "status_code"}))

var egressBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "azure_blob_egress_bytes_total",
Help: "Total bytes downloaded from Azure Blob Storage.",
})

func init() {
requestDuration.Register()
prometheus.MustRegister(egressBytesTotal)
}

var (
supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment}
noClientKey = azblob.ClientProvidedKeyOptions{}
Expand Down Expand Up @@ -154,23 +134,60 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig
}
}

type BlobStorageMetrics struct {
requestDuration *prometheus.HistogramVec
egressBytesTotal prometheus.Counter
}

// NewBlobStorageMetrics creates the blob storage metrics struct and registers all of it's metrics.
func NewBlobStorageMetrics() BlobStorageMetrics {
b := BlobStorageMetrics{
requestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "azure_blob_request_duration_seconds",
Help: "Time spent doing azure blob requests.",
// Latency seems to range from a few ms to a few secs and is
// important. So use 6 buckets from 5ms to 5s.
Buckets: prometheus.ExponentialBuckets(0.005, 4, 6),
}, []string{"operation", "status_code"}),
egressBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "azure_blob_egress_bytes_total",
Help: "Total bytes downloaded from Azure Blob Storage.",
}),
}
prometheus.MustRegister(b.requestDuration)
prometheus.MustRegister(b.egressBytesTotal)
return b
}

// Unregister unregisters the blob storage metrics with the prometheus default registerer, useful for tests
// where we frequently need to create multiple instances of the metrics struct, but not globally.
func (bm *BlobStorageMetrics) Unregister() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

prometheus.Unregister(bm.requestDuration)
prometheus.Unregister(bm.egressBytesTotal)
}

// BlobStorage is used to interact with azure blob storage for setting or getting time series chunks.
// Implements ObjectStorage
type BlobStorage struct {
// blobService storage.Serv
cfg *BlobStorageConfig

metrics BlobStorageMetrics

containerURL azblob.ContainerURL

pipeline pipeline.Pipeline
hedgingPipeline pipeline.Pipeline
}

// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) {
func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingCfg hedging.Config) (*BlobStorage, error) {
log.WarnExperimentalUse("Azure Blob Storage", log.Logger)
blobStorage := &BlobStorage{
cfg: cfg,
cfg: cfg,
metrics: metrics,
}
pipeline, err := blobStorage.newPipeline(hedgingCfg, false)
if err != nil {
Expand Down Expand Up @@ -204,13 +221,12 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC
size int64
rc io.ReadCloser
)
err := instrument.CollectedRequest(ctx, "azure.GetObject", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
var err error
rc, size, err = b.getObject(ctx, objectKey)
return err
})
// Assume even failed requests egress bytes count towards rate limits and count them here.
egressBytesTotal.Add(float64(size))
b.metrics.egressBytesTotal.Add(float64(size))
if err != nil {
// cancel the context if there is an error.
cancel()
Expand All @@ -236,7 +252,7 @@ func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.Re
}

func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return instrument.CollectedRequest(ctx, "azure.PutObject", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "azure.PutObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
Expand Down Expand Up @@ -380,7 +396,7 @@ func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chu
return nil, nil, ctx.Err()
}

err := instrument.CollectedRequest(ctx, "azure.List", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "azure.List", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, delimiter, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return err
Expand Down Expand Up @@ -413,7 +429,7 @@ func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chu
}

func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error {
return instrument.CollectedRequest(ctx, "azure.DeleteObject", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "azure.DeleteObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(blobID, false)
if err != nil {
return err
Expand Down
12 changes: 7 additions & 5 deletions pkg/storage/chunk/azure/blob_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error)
}

func Test_Hedging(t *testing.T) {
metrics := NewBlobStorageMetrics()
for _, tc := range []struct {
name string
expectedCalls int32
Expand Down Expand Up @@ -75,11 +76,12 @@ func Test_Hedging(t *testing.T) {
ContainerName: "foo",
Environment: azureGlobal,
MaxRetries: 1,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
}, metrics,
hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)
require.Equal(t, tc.expectedCalls, count.Load())
Expand Down
Loading