Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Add namespace runtime options for runtime per-namespace config changes #2446

Merged
merged 46 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9992e3d
[dbnode] Add namespace runtime options manager
robskillington Jul 4, 2020
aac4198
Respect flush indexing concurrency runtime options
robskillington Jul 6, 2020
7409332
Merge branch 'master' into r/namespace-runtime-options
robskillington Jul 6, 2020
001164b
Merge branch 'master' into r/namespace-runtime-options
robskillington Jul 6, 2020
c321ffc
Fix fn pointer assignment
robskillington Jul 6, 2020
8779db1
Revert test files
robskillington Jul 6, 2020
c228639
Update runtime options
robskillington Jul 6, 2020
fbe1cec
Add note
robskillington Jul 6, 2020
dd4fcc8
Share underlying indexing resources across segment builders to bound …
robskillington Jul 6, 2020
ef41525
Reset sharded jobs after use
robskillington Jul 6, 2020
f8e5049
Fix upsert options endpoint
robskillington Jul 6, 2020
b069000
Reset docs batch between flushing batch during warm index flush
robskillington Jul 6, 2020
2f848fa
Merge branch 'master' into r/namespace-runtime-options
robskillington Jul 8, 2020
fd722e2
Merge branch 'master' into r/namespace-runtime-options
robskillington Jul 8, 2020
94564b8
Fix update namespace flow and also do not attempt index flush when no…
robskillington Jul 8, 2020
a264484
Add log for updating index runtime options
robskillington Jul 8, 2020
dd00127
Use context pool for FST readers, also fix avoid unnecessary locking …
robskillington Jul 9, 2020
e5a7dd5
Merge branch 'master' into r/namespace-runtime-options
robskillington Jul 16, 2020
62b8f68
Merge branch 'master' into r/namespace-runtime-options
robskillington Jul 17, 2020
09bc443
Address feedback
robskillington Jul 17, 2020
bbfb16a
Address feedback
robskillington Jul 18, 2020
5e3e9c6
Fix concurrency issue
robskillington Jul 18, 2020
a687e0f
Fix tests
robskillington Jul 18, 2020
8acd924
Fix tests
robskillington Jul 18, 2020
2ef0f02
Fix test build
robskillington Jul 18, 2020
296d834
Fix not closing listener
robskillington Jul 18, 2020
38039ac
Fix build
robskillington Jul 18, 2020
8f4daee
Use per CPU queue for shard insert queue
robskillington Jul 19, 2020
aa07507
Less locks in shard insert queue
robskillington Jul 19, 2020
7116768
Fix uninit var
robskillington Jul 19, 2020
8ec78e0
Single insertion for shard insert queue batch exec
robskillington Jul 19, 2020
09fcc34
Write bootstrap data to side-buffer then rotate/merge after bootstrap…
robskillington Jul 20, 2020
e8f25ea
With higher tchannel send buffer size
robskillington Jul 20, 2020
3cb6f0e
Rate limit reading index files
robskillington Jul 20, 2020
12c467e
Avoid applying rate limit currently
robskillington Jul 21, 2020
4518df4
Revert "Avoid applying rate limit currently"
robskillington Jul 22, 2020
9ab8944
Revert "Rate limit reading index files"
robskillington Jul 22, 2020
63f1f47
Do not autovalidate index segments on boot
robskillington Jul 22, 2020
83a89d1
Check is sealed directly rather than inferring block is already closed
robskillington Jul 22, 2020
3421885
Synchronous lock on bootstrap series insert for faster bootstrap
robskillington Jul 22, 2020
95a9b05
Fix insertSeriesSync
robskillington Jul 22, 2020
2a38d80
Address feedback
robskillington Jul 22, 2020
bc253fa
Add FST writer options to FST writer
robskillington Jul 22, 2020
5c45949
Merge branch 'master' into r/namespace-runtime-options
robskillington Jul 22, 2020
1b9a463
Fix test and revise tchannel send frame default value
robskillington Jul 22, 2020
921551a
Merge branch 'r/namespace-runtime-options' of github.com:m3db/m3 into…
robskillington Jul 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ install-tools:
GOBIN=$(tools_bin_path) go install github.com/mauricelam/genny
GOBIN=$(tools_bin_path) go install github.com/mjibson/esc
GOBIN=$(tools_bin_path) go install github.com/pointlander/peg
GOBIN=$(tools_bin_path) go install github.com/prateek/gorename
GOBIN=$(tools_bin_path) go install github.com/robskillington/gorename
GOBIN=$(tools_bin_path) go install github.com/rakyll/statik
GOBIN=$(tools_bin_path) go install github.com/garethr/kubeval

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ require (
github.com/pointlander/jetset v1.0.0 // indirect
github.com/pointlander/peg v1.0.0
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect
github.com/prateek/gorename v0.0.0-20180424020013-52c7307cddd2
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/common v0.9.1
github.com/prometheus/prometheus v1.8.2-0.20200420081721-18254838fbe2
github.com/rakyll/statik v0.1.6
github.com/remeh/sizedwaitgroup v1.0.0 // indirect
github.com/rhysd/go-github-selfupdate v1.2.2 // indirect
github.com/robskillington/gorename v0.0.0-20180424020013-52c7307cddd2
github.com/russross/blackfriday v2.0.0+incompatible
github.com/rveen/ogdl v0.0.0-20200522080342-eeeda1a978e7 // indirect
github.com/satori/go.uuid v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,6 @@ github.com/pointlander/peg v1.0.0/go.mod h1:WJTMcgeWYr6fZz4CwHnY1oWZCXew8GWCF93F
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a h1:AA9vgIBDjMHPC2McaGPojgV2dcI78ZC0TLNhYCXEKH8=
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a/go.mod h1:lzZQ3Noex5pfAy7mkAeCjcBDteYU85uWWnJ/y6gKU8k=
github.com/prateek/gorename v0.0.0-20180424020013-52c7307cddd2 h1:/pJs9wFXnmhD12W+dnwoNJXPtLsxa78y+vzC9i/Hs+A=
github.com/prateek/gorename v0.0.0-20180424020013-52c7307cddd2/go.mod h1:nw9dXugFBQe1pshgrdeRjyYrY+RxNZckdWkiGHX8URE=
github.com/prometheus/alertmanager v0.20.0/go.mod h1:9g2i48FAyZW6BtbsnvHtMHQXl2aVtrORKwKVCQ+nbrg=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
Expand Down Expand Up @@ -641,6 +639,8 @@ github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/rhysd/go-github-selfupdate v1.2.2 h1:G+mNzkc1wEtpmM6sFS/Ghkeq+ad4Yp6EZEHyp//wGEo=
github.com/rhysd/go-github-selfupdate v1.2.2/go.mod h1:khesvSyKcXDUxeySCedFh621iawCks0dS/QnHPcpCws=
github.com/robskillington/gorename v0.0.0-20180424020013-52c7307cddd2 h1:t+C9QFlvAI+evRn96lz7eKyzo1CgDx3YVx3N/GJIetk=
github.com/robskillington/gorename v0.0.0-20180424020013-52c7307cddd2/go.mod h1:CVTJ4xwzb/4H98jrd7NFgNoTAiL63scr2Pl7kqOcQAQ=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
12 changes: 6 additions & 6 deletions src/cmd/services/m3coordinator/downsample/downsample_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 27 additions & 22 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package downsample
import (
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

Expand All @@ -35,9 +34,14 @@ type Downsampler interface {
// MetricsAppender is a metrics appender that can build a samples
// appender, only valid to use with a single caller at a time.
type MetricsAppender interface {
// NextMetric progresses to building the next metric.
NextMetric()
// AddTag adds a tag to the current metric being built.
AddTag(name, value []byte)
// SamplesAppender returns a samples appender for the current
// metric built with the tags that have been set.
SamplesAppender(opts SampleAppenderOptions) (SamplesAppenderResult, error)
Reset()
// Finalize finalizes the entire metrics appender for reuse.
Finalize()
}

Expand Down Expand Up @@ -72,11 +76,9 @@ type SamplesAppender interface {
}

type downsampler struct {
opts DownsamplerOptions
agg agg

debugLogging bool
logger *zap.Logger
opts DownsamplerOptions
agg agg
metricsAppenderOpts metricsAppenderOptions
}

type downsamplerOptions struct {
Expand All @@ -95,24 +97,27 @@ func newDownsampler(opts downsamplerOptions) (*downsampler, error) {
debugLogging = true
}

metricsAppenderOpts := metricsAppenderOptions{
agg: opts.agg.aggregator,
clientRemote: opts.agg.clientRemote,
defaultStagedMetadatasProtos: opts.agg.defaultStagedMetadatasProtos,
clockOpts: opts.agg.clockOpts,
tagEncoderPool: opts.agg.pools.tagEncoderPool,
matcher: opts.agg.matcher,
metricTagsIteratorPool: opts.agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
}

return &downsampler{
opts: opts.opts,
agg: opts.agg,
debugLogging: debugLogging,
logger: logger,
opts: opts.opts,
agg: opts.agg,
metricsAppenderOpts: metricsAppenderOpts,
}, nil
}

func (d *downsampler) NewMetricsAppender() (MetricsAppender, error) {
return newMetricsAppender(metricsAppenderOptions{
agg: d.agg.aggregator,
clientRemote: d.agg.clientRemote,
defaultStagedMetadatasProtos: d.agg.defaultStagedMetadatasProtos,
clockOpts: d.agg.clockOpts,
tagEncoder: d.agg.pools.tagEncoderPool.Get(),
matcher: d.agg.matcher,
metricTagsIteratorPool: d.agg.pools.metricTagsIteratorPool,
debugLogging: d.debugLogging,
logger: d.logger,
}), nil
metricsAppender := d.agg.pools.metricsAppenderPool.Get()
metricsAppender.reset(d.metricsAppenderOpts)
return metricsAppender, nil
}
34 changes: 22 additions & 12 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,8 @@ func testDownsamplerAggregationIngest(
opts = *testOpts.sampleAppenderOpts
}
for _, metric := range testCounterMetrics {
appender.Reset()
appender.NextMetric()

for name, value := range metric.tags {
appender.AddTag([]byte(name), []byte(value))
}
Expand All @@ -1016,7 +1017,8 @@ func testDownsamplerAggregationIngest(
}
}
for _, metric := range testGaugeMetrics {
appender.Reset()
appender.NextMetric()

for name, value := range metric.tags {
appender.AddTag([]byte(name), []byte(value))
}
Expand Down Expand Up @@ -1116,13 +1118,20 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
tagEncoderOptions := serialize.NewTagEncoderOptions()
tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{})
tagEncoderPoolOptions := pool.NewObjectPoolOptions().
SetSize(2).
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-encoder-pool")))
tagDecoderPoolOptions := pool.NewObjectPoolOptions().
SetSize(2).
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("tag-decoder-pool")))
metricsAppenderPoolOptions := pool.NewObjectPoolOptions().
SetSize(2).
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
SubScope("metrics-appender-pool")))

var cfg Configuration
if opts.remoteClientMock != nil {
Expand All @@ -1137,16 +1146,17 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
}

instance, err := cfg.NewDownsampler(DownsamplerOptions{
Storage: storage,
ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)),
RulesKVStore: rulesKVStore,
AutoMappingRules: opts.autoMappingRules,
ClockOptions: clockOpts,
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
TagDecoderOptions: tagDecoderOptions,
TagEncoderPoolOptions: tagEncoderPoolOptions,
TagDecoderPoolOptions: tagDecoderPoolOptions,
Storage: storage,
ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)),
RulesKVStore: rulesKVStore,
AutoMappingRules: opts.autoMappingRules,
ClockOptions: clockOpts,
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
TagDecoderOptions: tagDecoderOptions,
TagEncoderPoolOptions: tagEncoderPoolOptions,
TagDecoderPoolOptions: tagDecoderPoolOptions,
MetricsAppenderPoolOptions: metricsAppenderPoolOptions,
})
require.NoError(t, err)

Expand Down
65 changes: 54 additions & 11 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,47 @@ import (
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"

"github.com/golang/protobuf/jsonpb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type metricsAppenderPool struct {
pool pool.ObjectPool
}

func newMetricsAppenderPool(opts pool.ObjectPoolOptions) *metricsAppenderPool {
p := &metricsAppenderPool{
pool: pool.NewObjectPool(opts),
}
p.pool.Init(func() interface{} {
return newMetricsAppender(p)
})
return p
}

func (p *metricsAppenderPool) Get() *metricsAppender {
return p.pool.Get().(*metricsAppender)
}

func (p *metricsAppenderPool) Put(v *metricsAppender) {
p.pool.Put(v)
}

type metricsAppender struct {
metricsAppenderOptions

pool *metricsAppenderPool

tags *tags
multiSamplesAppender *multiSamplesAppender
curr metadata.StagedMetadata
defaultStagedMetadatasCopies []metadata.StagedMetadatas
mappingRuleStoragePolicies []policy.StoragePolicy
tagEncoder serialize.TagEncoder
}

// metricsAppenderOptions will have one of agg or clientRemote set.
Expand All @@ -57,23 +83,40 @@ type metricsAppenderOptions struct {
clientRemote client.Client

defaultStagedMetadatasProtos []metricpb.StagedMetadatas
tagEncoder serialize.TagEncoder
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool

clockOpts clock.Options
debugLogging bool
logger *zap.Logger
}

func newMetricsAppender(opts metricsAppenderOptions) *metricsAppender {
stagedMetadatasCopies := make([]metadata.StagedMetadatas,
len(opts.defaultStagedMetadatasProtos))
func newMetricsAppender(pool *metricsAppenderPool) *metricsAppender {
return &metricsAppender{
metricsAppenderOptions: opts,
tags: newTags(),
multiSamplesAppender: newMultiSamplesAppender(),
defaultStagedMetadatasCopies: stagedMetadatasCopies,
pool: pool,
tags: newTags(),
multiSamplesAppender: newMultiSamplesAppender(),
}
}

// reset is called when pulled from the pool.
func (a *metricsAppender) reset(opts metricsAppenderOptions) {
a.metricsAppenderOptions = opts
if a.tagEncoder == nil {
a.tagEncoder = opts.tagEncoderPool.Get()
}

// Make sure a.defaultStagedMetadatasCopies is right length.
capRequired := len(opts.defaultStagedMetadatasProtos)
if cap(a.defaultStagedMetadatasCopies) < capRequired {
// Too short, reallocate.
slice := make([]metadata.StagedMetadatas, capRequired)
a.defaultStagedMetadatasCopies = slice
} else {
// Has enough capacity, take subslice.
slice := a.defaultStagedMetadatasCopies[:capRequired]
a.defaultStagedMetadatasCopies = slice
}
}

Expand Down Expand Up @@ -317,14 +360,14 @@ func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) {
a.logger.Debug(str, fields...)
}

func (a *metricsAppender) Reset() {
func (a *metricsAppender) NextMetric() {
a.tags.names = a.tags.names[:0]
a.tags.values = a.tags.values[:0]
}

func (a *metricsAppender) Finalize() {
a.tagEncoder.Finalize()
a.tagEncoder = nil
// Return to pool.
a.pool.Put(a)
}

func stagedMetadatasLogField(sm metadata.StagedMetadatas) zapcore.Field {
Expand Down
Loading