-
Notifications
You must be signed in to change notification settings - Fork 452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[aggregator] Simplify (Active)StagedPlacement API #3199
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3199 +/- ##
=========================================
- Coverage 72.2% 72.2% -0.1%
=========================================
Files 1087 1087
Lines 100714 100686 -28
=========================================
- Hits 72816 72774 -42
- Misses 22838 22846 +8
- Partials 5060 5066 +6
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report at Codecov.
|
@@ -34,38 +33,36 @@ import ( | |||
var ( | |||
errNoApplicablePlacement = errors.New("no applicable placement found") | |||
errActiveStagedPlacementClosed = errors.New("active staged placement is closed") | |||
errPlacementInvalidType = errors.New("type assertion failed, corrupt placement") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: type assertion is an implementation detail, this can probably just be "corrupt placement"
) | ||
|
||
type activeStagedPlacement struct { | ||
sync.RWMutex | ||
var _noPlacements Placements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: group this with the errors, just put a break between 'em
placements atomic.Value | ||
version int | ||
nowFn clock.NowFn | ||
onPlacementsAddedFn OnPlacementsAddedFn | ||
onPlacementsRemovedFn OnPlacementsRemovedFn | ||
|
||
expiring atomic.Int32 | ||
closed bool | ||
doneFn DoneFn | ||
expiring atomic.Int32 | ||
closed atomic.Bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably not much value in memory-aligning, so mind sorting these?
return errActiveStagedPlacementClosed | ||
} | ||
if p.onPlacementsRemovedFn != nil { | ||
p.onPlacementsRemovedFn(p.placements) | ||
pl, ok := p.placements.Load().(Placements) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think it's worth adding a quick note that we understand that pl
is still subject to mutability races, it's expected based on historical design, something to that effect?
|
||
func (p *activeStagedPlacement) activePlacementWithLock(timeNanos int64) (Placement, error) { | ||
if p.closed { | ||
if p.closed.Load() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do this before the loading placements
@@ -34,38 +33,36 @@ import ( | |||
var ( | |||
errNoApplicablePlacement = errors.New("no applicable placement found") | |||
errActiveStagedPlacementClosed = errors.New("active staged placement is closed") | |||
errPlacementInvalidType = errors.New("corrupt placement") | |||
|
|||
_noPlacements Placements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ultra-nit: _emptyPlacements
would read better.
* master: (30 commits) [dbnode] Use go context to cancel index query workers after timeout (#3194) [aggregator] Fix change ActivePlacement semantics on close (#3201) [aggregator] Simplify (Active)StagedPlacement API (#3199) [aggregator] Checking if metadata is set to default should not cause copying (#3198) [dbnode] Remove readers and writer from aggregator API (#3122) [aggregator] Avoid large copies in entry rollup comparisons by making them more inline-friendly (#3195) [dbnode] Re-add aggregator doc limit update (#3137) [m3db] Do not close reader in filterFieldsIterator.Close() (#3196) Revert "Remove disk series read limit (#3174)" (#3193) [instrument] Improve sampled timer and stopwatch performance (#3191) Omit unset fields in metadata json (#3189) [dbnode] Remove left-over code in storage/bootstrap/bootstrapper (#3190) [dbnode][coordinator] Support match[] in label endpoints (#3180) Instrument the worker pool with the wait time (#3188) Instrument query path (#3182) [aggregator] Remove indirection, large copy from unaggregated protobuf decoder (#3186) [aggregator] Sample timers completely (#3184) [aggregator] Reduce error handling overhead in rawtcp server (#3183) [aggregator] Move shardID calculation out of critical section (#3179) Move instrumentation cleanup to FetchTaggedResultIterator Close() (#3173) ...
What this PR does / why we need it:
This simplifies staged placement API and significantly reduces contention (placement is fetched for each metric write).
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: