-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement composite watchable config map provider #4391
Implement composite watchable config map provider #4391
Conversation
6bd7ea3
to
834f18e
Compare
I will check the test failure, it probably caught a race bug. |
There is indeed a race bug, which the test caught. Fixing it and will also rebase. |
return cr.composedMap | ||
} | ||
|
||
func (cr *compositeRetrieved) WatchForUpdate() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a much simpler implementation also after I did #4370 which solved few bugs, is to change the Provider interface and instead pass a "chan" or even better a func pointer:
// Provider is an interface that helps providing configuration's parser.
// Implementations may load the parser from a file, a database or any other source.
type Provider interface {
// Retrieve goes to the configuration source and retrieves the selected data which
// contains the value to be injected in the configuration and the corresponding watcher that
// will be used to monitor for updates of the retrieved value.
Retrieve(ctx context.Context, watcher func(Event{})) (Retrieved, error)
// Close signals that the configuration for which it was used to retrieve values is no longer in use
// and the object should close and release any watchers that it may have created.
// This method must be called when the service ends, either in case of success or error.
Shutdown(ctx context.Context) error
}
// Retrieved holds the result of a call to the Retrieve method of a Session object.
type Retrieved interface {
// Get returns the Map.
Get() *config.Map
// Close signals that the configuration for which it was used to retrieve values is no longer in use
// and the object should close and release any watchers that it may have created.
// Implementations should not call the "watcher" func after this func returned.
Close(ctx context.Context) error
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bogdandrutu I posted a PR that does this, see #4403
I like it, I believe it simplifies watching for changes. I suggest that we close this PR and continue further work with the callback approach.
The new compositeMapProvider implements a generic map provider that can be composed of subproviders and also implements a watchable interface that can be triggered by any subprovider. The ExpandMapProvider and MergeMapProvider are rewritten using the compositeMapProvider with custom compose function. This is necessary to implement file watching in fileMapProvider. I have that part ready as well, but it won't work until the providers that contain it are watchable (which this change adds). There is a fair but of concurrent code in this change, it would be great to have another pair of eyes to check I don't have races and other bugs here.
834f18e
to
b15ee6f
Compare
Codecov Report
@@ Coverage Diff @@
## main #4391 +/- ##
==========================================
+ Coverage 90.31% 90.42% +0.11%
==========================================
Files 177 178 +1
Lines 10338 10459 +121
==========================================
+ Hits 9337 9458 +121
- Misses 779 780 +1
+ Partials 222 221 -1
Continue to review full report at Codecov.
|
|
||
// configComposeFunc combines a current config "orig" with another config "add". | ||
// The resulting config is stored in "orig". | ||
type configComposeFunc func(orig *config.Map, add *config.Map) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confusing "orig" and "add" names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type needs to be Exported.
// | ||
// If the update is successful then the returned error will be nil. | ||
// If Close() was called while WatchForUpdate() is in progress then the returned | ||
// error will be configsource.ErrSessionClosed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate PR to move this in this package, maybe an issue to track it?
// At least one watcher fired. Close all sub-Retrieved. We don't need | ||
// to close the one that fired the signal (readyWatchable.watchable), | ||
// so provide it as an exception to closeAllExcept(). | ||
if err := cr.closeAllExcept(cr.watchContext, readyWatchable.watchable); err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why you don't close that as well. I am not sure that we want the limitation to be when WatchForUpdate
returns to also close, maybe we want to WatchForUpdate
again? If this is the case you need to document that when returns Close is not needed to be called.
if err == nil { | ||
// All Watchables returned without error, which means they are ready for | ||
// another Get() call. Call Get() for all subproviders again and combine | ||
// the results. | ||
configMap, err2 := cr.subRetrieved.getAllAndCompose(cr.composeFunc) | ||
if err2 != nil { | ||
return err2 | ||
} | ||
|
||
// Remember the result. | ||
cr.composedMap = configMap | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an overoptimization. You just signal the caller that you got an update and let the collector redo the map etc. This will simplify the requirements of concurrent calls, and also the logic here.
// r := mapProvider.Retrieve() | | ||
// r.Get() | | ||
// r.WatchForUpdate() blocks until there are updates | | ||
// ... | | ||
// r.Get() | | ||
// r.WatchForUpdate() | Close() can be called anytime after Retrieve() returns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not what we have in the collector right now. We call r.Close
after everyr.WatchForUpdate()
then mapProvider.Retrieve()
again.
Let's not merge this. I think it is preferable to go this way #4403 and after that re-implement the composite provider if needed. |
This PR was marked stale due to lack of activity. It will be closed in 7 days. |
This PR was marked stale due to lack of activity. It will be closed in 7 days. |
@tigrannajaryan I think this PR reached his goal and we learned/implemented from it. Do we still need anything from here? |
No, it is not needed anymore. |
The new compositeMapProvider implements a generic map provider
that can be composed of subproviders and also implements a
watchable interface that can be triggered by any subprovider.
The ExpandMapProvider and MergeMapProvider are rewritten using
the compositeMapProvider with custom compose function.
This is necessary to implement file watching in fileMapProvider.
I have that part ready as well, but it won't work until the
providers that contain it are watchable (which this change adds).
There is a fair but of concurrent code in this change, it would
be great to have another pair of eyes to check I don't have
races and other bugs here.