Skip to content

Commit

Permalink
Start refactor of storage (chunks) clients metrics into structs (#5057)
Browse files Browse the repository at this point in the history
* Refactor azure metrics to be a struct (+ add an overall client metrics
struct), add an azure client metric for total egress bytes.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Add changelog entry for new azure metric.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fixes based on Kavi's review.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix one more merge conflict that I missed in the merge commit.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix failing tests.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
cstyan authored Jan 25, 2022
1 parent 7713ff7 commit bfef7ba
Show file tree
Hide file tree
Showing 20 changed files with 187 additions and 102 deletions.
5 changes: 3 additions & 2 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func main() {
}
// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
sourceStore, err := chunk_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig.StoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
clientMetrics := chunk_storage.NewClientMetrics()
sourceStore, err := chunk_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig.StoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create source store:", err)
os.Exit(1)
Expand All @@ -105,7 +106,7 @@ func main() {

// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
destStore, err := chunk_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig.StoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
destStore, err := chunk_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig.StoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create destination store:", err)
os.Exit(1)
Expand Down
5 changes: 3 additions & 2 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
if err != nil {
return err
}
storage.RegisterCustomIndexClients(&conf.StorageConfig, prometheus.DefaultRegisterer)
cm := chunk_storage.NewClientMetrics()
storage.RegisterCustomIndexClients(&conf.StorageConfig, cm, prometheus.DefaultRegisterer)
conf.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
chunkStore, err := chunk_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
chunkStore, err := chunk_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, cm, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/tracing"
"github.com/grafana/loki/pkg/util/fakeauth"
Expand Down Expand Up @@ -264,21 +265,24 @@ type Loki struct {
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler

clientMetrics chunk_storage.ClientMetrics

HTTPAuthMiddleware middleware.Interface
}

// New makes a new Loki.
func New(cfg Config) (*Loki, error) {
loki := &Loki{
Cfg: cfg,
Cfg: cfg,
clientMetrics: chunk_storage.NewClientMetrics(),
}

loki.setupAuthMiddleware()
loki.setupGRPCRecoveryMiddleware()
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
storage.RegisterCustomIndexClients(&loki.Cfg.StorageConfig, prometheus.DefaultRegisterer)
storage.RegisterCustomIndexClients(&loki.Cfg.StorageConfig, loki.clientMetrics, prometheus.DefaultRegisterer)

return loki, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}

chunkStore, err := chunk_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger)
chunkStore, err := chunk_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return
}
Expand Down Expand Up @@ -601,7 +601,7 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
}
}

t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, ruler.GroupLoader{}, util_log.Logger)
t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger)

return
}
Expand Down Expand Up @@ -701,7 +701,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
if err != nil {
return nil, err
}
t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand All @@ -718,7 +718,7 @@ func (t *Loki) initCompactor() (services.Service, error) {

func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config)
objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config, t.clientMetrics)
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/ruler/base/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/util/test"
)

Expand All @@ -24,7 +25,9 @@ func TestRulerShutdown(t *testing.T) {
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
defer cleanup()

r, rcleanup := buildRuler(t, config, nil, nil)
m := storage.NewClientMetrics()
defer m.Unregister()
r, rcleanup := buildRuler(t, config, nil, m, nil)
defer rcleanup()

r.cfg.EnableSharding = true
Expand Down Expand Up @@ -59,7 +62,9 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
ctx := context.Background()
config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules))
defer cleanup()
r, rcleanup := buildRuler(t, config, nil, nil)
m := storage.NewClientMetrics()
defer m.Unregister()
r, rcleanup := buildRuler(t, config, nil, m, nil)
defer rcleanup()
r.cfg.EnableSharding = true
r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond
Expand Down
22 changes: 15 additions & 7 deletions pkg/ruler/base/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/grafana/loki/pkg/ruler/rulestore/objectclient"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/tenant"
)

Expand Down Expand Up @@ -221,9 +222,9 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer
}
}

func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, rulerAddrMap map[string]*Ruler) (*Ruler, func()) {
func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, clientMetrics chunk_storage.ClientMetrics, rulerAddrMap map[string]*Ruler) (*Ruler, func()) {
engine, queryable, pusher, logger, overrides, reg, cleanup := testSetup(t, querierTestConfig)
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, promRules.FileLoader{}, log.NewNopLogger())
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
require.NoError(t, err)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg)
Expand All @@ -244,7 +245,9 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes
}

func newTestRuler(t *testing.T, rulerConfig Config) (*Ruler, func()) {
ruler, cleanup := buildRuler(t, rulerConfig, nil, nil)
m := chunk_storage.NewClientMetrics()
defer m.Unregister()
ruler, cleanup := buildRuler(t, rulerConfig, nil, m, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))

// Ensure all rules are loaded before usage
Expand Down Expand Up @@ -415,8 +418,9 @@ func TestGetRules(t *testing.T) {
Mock: kvStore,
},
}

r, cleanUp := buildRuler(t, cfg, nil, rulerAddrMap)
m := chunk_storage.NewClientMetrics()
defer m.Unregister()
r, cleanUp := buildRuler(t, cfg, nil, m, rulerAddrMap)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
t.Cleanup(cleanUp)
rulerAddrMap[id] = r
Expand Down Expand Up @@ -919,7 +923,9 @@ func TestSharding(t *testing.T) {
DisabledTenants: tc.disabledUsers,
}

r, cleanup := buildRuler(t, cfg, nil, nil)
m := chunk_storage.NewClientMetrics()
defer m.Unregister()
r, cleanup := buildRuler(t, cfg, nil, m, nil)
r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize}
t.Cleanup(cleanup)

Expand Down Expand Up @@ -1276,8 +1282,10 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
querier.UseAlwaysQueryable(querier.NewChunkStoreQueryable(querierConfig, &emptyChunkStore{})),
}

m := chunk_storage.NewClientMetrics()
defer m.Unregister()
// create a ruler but don't start it. instead, we'll evaluate the rule groups manually.
r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, nil)
r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, m, nil)
r.syncRules(context.Background(), rulerSyncReasonInitial)
defer rcleanup()

Expand Down
5 changes: 3 additions & 2 deletions pkg/ruler/base/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/storage/chunk/storage"
)

// RuleStoreConfig configures a rule store.
Expand Down Expand Up @@ -77,7 +78,7 @@ func (cfg *RuleStoreConfig) IsDefaults() bool {
// NewLegacyRuleStore returns a rule store backend client based on the provided cfg.
// The client used by the function is based a legacy object store clients that shouldn't
// be used anymore.
func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
if cfg.mock != nil {
return cfg.mock, nil
}
Expand All @@ -97,7 +98,7 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, loader pro
}
return configdb.NewConfigRuleStore(c), nil
case "azure":
client, err = azure.NewBlobStorage(&cfg.Azure, hedgeCfg)
client, err = azure.NewBlobStorage(&cfg.Azure, clientMetrics.AzureMetrics, hedgeCfg)
case "gcs":
client, err = gcp.NewGCSObjectClient(context.Background(), cfg.GCS, hedgeCfg)
case "s3":
Expand Down
72 changes: 44 additions & 28 deletions pkg/storage/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,6 @@ const (
azureUSGovernment = "AzureUSGovernment"
)

var requestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "azure_blob_request_duration_seconds",
Help: "Time spent doing azure blob requests.",
// Latency seems to range from a few ms to a few secs and is
// important. So use 6 buckets from 5ms to 5s.
Buckets: prometheus.ExponentialBuckets(0.005, 4, 6),
}, []string{"operation", "status_code"}))

var egressBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "azure_blob_egress_bytes_total",
Help: "Total bytes downloaded from Azure Blob Storage.",
})

func init() {
requestDuration.Register()
prometheus.MustRegister(egressBytesTotal)
}

var (
supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment}
noClientKey = azblob.ClientProvidedKeyOptions{}
Expand Down Expand Up @@ -154,23 +134,60 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig
}
}

type BlobStorageMetrics struct {
requestDuration *prometheus.HistogramVec
egressBytesTotal prometheus.Counter
}

// NewBlobStorageMetrics creates the blob storage metrics struct and registers all of it's metrics.
func NewBlobStorageMetrics() BlobStorageMetrics {
b := BlobStorageMetrics{
requestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "azure_blob_request_duration_seconds",
Help: "Time spent doing azure blob requests.",
// Latency seems to range from a few ms to a few secs and is
// important. So use 6 buckets from 5ms to 5s.
Buckets: prometheus.ExponentialBuckets(0.005, 4, 6),
}, []string{"operation", "status_code"}),
egressBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "azure_blob_egress_bytes_total",
Help: "Total bytes downloaded from Azure Blob Storage.",
}),
}
prometheus.MustRegister(b.requestDuration)
prometheus.MustRegister(b.egressBytesTotal)
return b
}

// Unregister unregisters the blob storage metrics with the prometheus default registerer, useful for tests
// where we frequently need to create multiple instances of the metrics struct, but not globally.
func (bm *BlobStorageMetrics) Unregister() {
prometheus.Unregister(bm.requestDuration)
prometheus.Unregister(bm.egressBytesTotal)
}

// BlobStorage is used to interact with azure blob storage for setting or getting time series chunks.
// Implements ObjectStorage
type BlobStorage struct {
// blobService storage.Serv
cfg *BlobStorageConfig

metrics BlobStorageMetrics

containerURL azblob.ContainerURL

pipeline pipeline.Pipeline
hedgingPipeline pipeline.Pipeline
}

// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) {
func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingCfg hedging.Config) (*BlobStorage, error) {
log.WarnExperimentalUse("Azure Blob Storage", log.Logger)
blobStorage := &BlobStorage{
cfg: cfg,
cfg: cfg,
metrics: metrics,
}
pipeline, err := blobStorage.newPipeline(hedgingCfg, false)
if err != nil {
Expand Down Expand Up @@ -204,13 +221,12 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC
size int64
rc io.ReadCloser
)
err := instrument.CollectedRequest(ctx, "azure.GetObject", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
var err error
rc, size, err = b.getObject(ctx, objectKey)
return err
})
// Assume even failed requests egress bytes count towards rate limits and count them here.
egressBytesTotal.Add(float64(size))
b.metrics.egressBytesTotal.Add(float64(size))
if err != nil {
// cancel the context if there is an error.
cancel()
Expand All @@ -236,7 +252,7 @@ func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.Re
}

func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return instrument.CollectedRequest(ctx, "azure.PutObject", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "azure.PutObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
Expand Down Expand Up @@ -380,7 +396,7 @@ func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chu
return nil, nil, ctx.Err()
}

err := instrument.CollectedRequest(ctx, "azure.List", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "azure.List", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, delimiter, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return err
Expand Down Expand Up @@ -413,7 +429,7 @@ func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chu
}

func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error {
return instrument.CollectedRequest(ctx, "azure.DeleteObject", requestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "azure.DeleteObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(blobID, false)
if err != nil {
return err
Expand Down
12 changes: 7 additions & 5 deletions pkg/storage/chunk/azure/blob_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error)
}

func Test_Hedging(t *testing.T) {
metrics := NewBlobStorageMetrics()
for _, tc := range []struct {
name string
expectedCalls int32
Expand Down Expand Up @@ -75,11 +76,12 @@ func Test_Hedging(t *testing.T) {
ContainerName: "foo",
Environment: azureGlobal,
MaxRetries: 1,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
}, metrics,
hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)
require.Equal(t, tc.expectedCalls, count.Load())
Expand Down
Loading

0 comments on commit bfef7ba

Please sign in to comment.