-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add reminder
service with empty sendReminders logic
#2638
Add reminder
service with empty sendReminders logic
#2638
Conversation
e6945fb
to
435be78
Compare
5e05a0f
to
6571395
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for breaking this up, and sorry it took me awhile to get around to the review.
I'm trying to figure out how we can make Minder and Reminder as similar as possible, so that it's easier for people in one context to get into the other as well.
cmd/reminder/app/root.go
Outdated
os.Exit(1) | ||
} | ||
|
||
err = reminderconfig.ValidateConfig(cfgFileData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a general preference for having reasonable defaults rather than throwing an error (modulo the above "null keys when we need a value" check. It seems like we have two different "check config" endpoints here -- can we consolidate one into the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Ideally, the code for initConfig
would look similar for minder
, server
and reminder
...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modulo the above "null keys when we need a value" check
The null values check solves a different purpose. It is there to prevent corruption of viper key value store. It is there in all other cli's too.
can we consolidate one into the other?
I would rather not. The validation portion has a different method signature than the null value one, and doing something to workaround would break the pattern across CLIs rather than giving any form of uniformity.
Ideally, the code for initConfig would look similar for minder, server and reminder...)
It is almost the same, except reminder does validation.
sql_connection: | ||
dbhost: "reminder-event-postgres" | ||
dbport: 5432 | ||
dbuser: postgres | ||
dbpass: postgres | ||
dbname: reminder | ||
sslmode: disable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this is interesting... by using events
here, we're requiring the using of the watermill-SQL driver in Minder for reminder
to work (the default is go-channel
). That's probably okay, but I hadn't put it together as a limitation of this approach until now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it's not like that. Minder message queue system would be untouched. We will just add a new subscriber to the server for reminder events. Minder can continue to run using go-channel
. Minder would connect to reminder using separate config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused as to what this event configuration is for if it's not being used to trigger events in the minder server. Is this a separate SQL-only eventing system stand-alone for reminder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is a standalone publishing system for reminder that minder would subscribe to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we'll have two different messaging SQL databases, or was your thought to have this end up landing on the same database we use for watermill messaging in Minder today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, interesting.
We could theoretically run multiple reminder servers which would interoperate with each other.
I can see this happening, but I don't have the design pattern for it, and it isn't required now, so I won't brainstorm on that side.
The idea would be that we could run multiple servers for high availability. I agree that it's not necessary for right now, as long as we make sure that it's not strictly required to only run one Reminder for correctness.
(You good with storing in memory for now (cache) ?)
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, do we want to modify
type RepoReconcilerEvent struct
to have a source string field for visibility and helping to differentiate whether the event was generated by reminder or minder. (For topic:TopicQueueReconcileRepoInit
)
Rather than putting it in the type RepoReconcilerEvent struct
, can you put something like periodic=true
or trigger=revisit
in the message metadata?
Ideally, we'd have a good split between routing metadata (message subject, event type, etc) and the message contents (diff of the change, for example). Having the metadata in the outer envelope allows us to generically do things like 2-lane queues (handle revisits at lower priority than live notifications) and record success rates for revisits separately. The generic top-level handling layer should not know about the payload schema -- that's for the actual endpoint to decode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding cursors to the db would look something like:
CREATE TABLE reminder_project_cursor ( id SERIAL PRIMARY KEY, -- Some sort of ID to differentiate between different reminders cursor TEXT NOT NULL ); CREATE TABLE reminder_repository_cursor ( id SERIAL PRIMARY KEY, -- Some sort of ID to differentiate between different reminders project_id UUID NOT NULL, provider TEXT NOT NULL, cursor TEXT NOT NULL, FOREIGN KEY (project_id, provider) REFERENCES providers(project_id, name) ON DELETE CASCADE, UNIQUE (project_id, provider) );Using this approach every reminder service would have some unique identifier associated with it. I had a few concerns:
- Are we simplifying by storing in db or complicating it more than required? (With in-memory cache and all that stuff)
- Do we even need to run multiple reminder instances? You told me a lot of other things would break before we scale.
- If yes, wouldn't it be simpler to have a reminder service operate on shard? Or set a limit somehow? (Taking a wild guess here)
- Are we over-engineering this 😬 ?
Do we need fairness at both the project and the repo level? My naive thought would be to iterate across all the entities -- projects which had more repos registered would get more "revisits / day" than smaller projects, but that seems appropriate. We probably want to limit how many concurrent reconciliations are happening in a project, but that might make more sense to do at the Minder level, since we could have a project that genuinely gets changes to 50 repos at the same time.
It's hard to guarantee fairness until we know what fairness looks like. 😁
In terms of running multiple reminder instances, my goal is that we should be able to upgrade Reminder as follows:
- Reminder v.123 is running one instance
- We start a Reminder v.124
- We shut down Reminder v.123
This is easy to do with a Kubernetes Deployment with maxSurge: 1
, and avoids getting into a situation where v.124 doesn't work, but we've corrupted state needed for v.123 to operate, so we end up needing to do manual maintenance. This is also the model we use with the core Minder binary, so it's less surprising than having a different model. As long as we can use that manner of deployment, I'm happy with many different solutions:
a. Iterate over the database several times faster than the reminder interval, but honor some pacing on actually sending the events. In this case, we might not need to keep a cursor at all.
b. Add a database table to track the Reminder cursors.
c. Use "continuation messages" in Watermill to describe what we just processed. e.g. at the end of processing up to row N, send a "now process >N" message, then acknowledge the "now process >N-10" message that was in queue earlier. If there's no message, start at a random point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need fairness at both the project and the repo level? My naive thought would be to iterate across all the entities -- projects which had more repos registered would get more "revisits / day" than smaller projects, but that seems appropriate. We probably want to limit how many concurrent reconciliations are happening in a project, but that might make more sense to do at the Minder level, since we could have a project that genuinely gets changes to 50 repos at the same time.
It's hard to guarantee fairness until we know what fairness looks like. 😁
Exactly. My main idea behind this was to ensure fair sharing among projects. If we reconcile large number of entities in a single project, we might hate the rate limits (unprobable, but possible). Current model (which is yet to be pushed) will work on the logic of picking min number of repos form a project (to ensure some fair share), and picking additional repos if there are more spots.
Let's discuss cursors in a discord thread. I also would like reminder to run as a deployment rather than a stateful set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internal/config/reminder/config.go
Outdated
if cfg.RecurrenceConfig.BatchSize < | ||
cfg.RecurrenceConfig.MaxPerProject*cfg.RecurrenceConfig.MinProjectFetchLimit { | ||
return fmt.Errorf("batch_size %d cannot be less than max_per_project(%d)*min_project_fetch_limit(%d)=%d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than erroring, I have a slight preference for printing an Error-level message and continuing with a larger BatchSize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Also, it feels like we should add these to RecurrenceConfig.Validate
, since they deal with only those fields.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
r.logger.Debug().Msg("storing cursor state") | ||
|
||
var buf bytes.Buffer | ||
enc := gob.NewEncoder(&buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use gob
rather than simply JSON?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read in a few testing results that gob is better for large data. So I chose it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How large is this data going to be? I see that it's stored in a map
, so I'm wondering how many keys there will be in the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's just the two keys, then performance really doesn't matter. If the file is going to be e.g. 100MB, we need to talk more generally about storing state. 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is directly proportional to the number of projects being processed in an iteration, which is configurable, so it can get as large as you'd want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to keep track of all the projects processed in an iteration? It should be fine to re-run a small number of projects, so we could keep just the first (or last) item in the batch.
Binary formats are a pain for humans to work with (we need to build extra tools), so I'd prefer JSON or YAML if possible so we can clean up intermediate state in an emergency with simple tools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to keep track of all the projects processed in an iteration? It should be fine to re-run a small number of projects, so we could keep just the first (or last) item in the batch.
We have project cursor and repo cursor for every project we traverse. The repo cursor list grows as we traverse more projects because we only pick small no of repos from a project to have sort of fair share type algo and avoid hitting rate limits for that project.
We have to keep track of all repo cursors as when we return to the same project in subsequent loops, we want to pick the repos that haven't been picked yet.
Binary formats are a pain for humans to work with (we need to build extra tools), so I'd prefer JSON or YAML if possible so we can clean up intermediate state in an emergency with simple tools.
I just had some performance concerns (though they may not matter that much), I'll change to json or yaml.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to keep track of all the projects processed in an iteration? It should be fine to re-run a small number of projects, so we could keep just the first (or last) item in the batch.
We have project cursor and repo cursor for every project we traverse. The repo cursor list grows as we traverse more projects because we only pick small no of repos from a project to have sort of fair share type algo and avoid hitting rate limits for that project.
If we were to traverse the lists in order of repo ID and rely on Minder to limit reconciliations during contention, would that serve the same result? I'm worried that we only have a partial view of what's going on in terms of quota usage here, so I'd rather have Reminder act as a reliable clock but not get too fancy in terms of trying to push back because Minder will still need to deal with quota exhaustion in other cases. (We had one about a week ago due to poor cache behavior, for example.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we were to traverse the lists in order of repo ID and rely on Minder to limit reconciliations during contention, would that serve the same result?
Well, yes, technically, we can achieve the same results by just pushing all repos for reconciliation, and minder actually creates a batch of what is going to be reconciled. But wasn't the original plan to have this as a separate microservice? I would just like minder to process the events and give errors if it can't due to rate limits. In such a case, those entities would be picked in the next complete cycle.
return err | ||
} | ||
|
||
return os.WriteFile(r.cfg.CursorFile, buf.Bytes(), 0600) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to make sure that storeCursorState
isn't called concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't called concurrently. Firstly reminder is initialized (NewReminder) and Start is called in a separate goroutine, so it shouldn't be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to the method that it's not safe to call this method from multiple threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. But only three methods are exposed (start, stop, new), so it shouldn't be called from multiple threads unless someone decides to call Start multiple times in different go routines. Should we prohibit that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see storeCursorState
being called at all, so I had to guess about where and how the call chain to it happened.
internal/reminder/reminder.go
Outdated
}) | ||
} | ||
|
||
func (r *reminder) storeCursorState() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where this is called. Should it be called in the loop from Start
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is not being used right now. This would be used by batch-building methods. restoreCursorState()
is used and present, so it made sense to add this (for completeness and testing)
43c00cf
to
c6ac1db
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple comments, this is looking like a good skeleton to build from.
internal/reminder/reminder.go
Outdated
r.logger.Error().Err(err).Msg("error restoring cursor state") | ||
} | ||
|
||
pub, cl, err := r.setupSQLPublisher(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think you want to pass in ctx
here.
pub, cl, err := r.setupSQLPublisher(context.Background()) | |
pub, cl, err := r.setupSQLPublisher(ctx) |
r.logger.Debug().Msg("storing cursor state") | ||
|
||
var buf bytes.Buffer | ||
enc := gob.NewEncoder(&buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's just the two keys, then performance really doesn't matter. If the file is going to be e.g. 100MB, we need to talk more generally about storing state. 😁
return err | ||
} | ||
|
||
return os.WriteFile(r.cfg.CursorFile, buf.Bytes(), 0600) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to the method that it's not safe to call this method from multiple threads?
c6ac1db
to
009a191
Compare
d95eba8
to
43b9177
Compare
@evankanderson ping, in-case you missed the review req. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the delay -- I was chaperoning a solo-parent-with-two-kids trip last week, and we've been crunching a bit in preparation for Open Source Summit. My ability to review will be a bit spotty for the next 3-ish days due to the same, and then I should be able to give this some more full-time attention.
|
||
func init() { | ||
cobra.OnInitialize(initConfig) | ||
reminderconfig.SetViperDefaults(viper.GetViper()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this in init
here, but in initConfig
in server/app/root.go
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't cause any difference in functionality. It is in init
as when we RegisterReminderFlags
, we actually lookup the deafult from viper which populates it using struct fields. In server cli code, the default values for flags is hardcoded, so this is actually better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you file an issue to move server to this pattern? (Don't do it in this PR, but we should track these cleanup opportunities)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #3145
cmd/reminder/app/root.go
Outdated
RootCmd.PrintErrln("Error registering reminder flags") | ||
os.Exit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we use log.Fatal().Err(err).Msg("...")
instead of RootCmd.PrintErrln("..."); os.Exit(1)
in server/app/root.go
. Can we use the same pattern here (for consistency)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it, but the server cli uses is just at a place or two, in the rest of the places it uses os.Exit
. We specifically set:
RootCmd.SetOut(os.Stdout)
RootCmd.SetErr(os.Stderr)
So, it introduces some inconsistency if we want to write logs to a file, but I changed it to use zerolog.
cmd/reminder/app/root.go
Outdated
os.Exit(1) | ||
} | ||
|
||
RootCmd.PersistentFlags().String("config", "", fmt.Sprintf("config file (default is $PWD/%s)", configFileName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put this flag declaration right after OnInitialize(initConfig)
(again, just for consistency, not because it will change behavior)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
cmd/reminder/app/start.go
Outdated
err = cfg.Validate() | ||
if err != nil { | ||
var batchSizeErr *reminderconfig.InvalidBatchSizeError | ||
if errors.As(err, &batchSizeErr) { | ||
// Update Batch Size in viper store | ||
updateBatchSize(cmd, batchSizeErr) | ||
|
||
// Complete config is read again to update the batch size. | ||
cfg, err = config.ReadConfigFromViper[reminderconfig.Config](viper.GetViper()) | ||
if err != nil { | ||
return fmt.Errorf("unable to read config: %w", err) | ||
} | ||
} else { | ||
return fmt.Errorf("invalid config: %w", err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do this as an accessor in config.GetBatchSize()
, then we might be able to avoid needing cfg.Validate()
at all, right?
(I'm asking because I worry about this block which is outside of test coverage growing over time... app/serve.go
is also bigger than I'd want.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified the logic to be out of the CLI (so we can test it). Should be fine now, ptal :)
// Try to parse it as a time.Duration | ||
var parseErr error | ||
defaultValue, parseErr = time.ParseDuration(value) | ||
if parseErr == nil { | ||
return defaultValue, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we lose the type aliasing at compile-time, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth commenting, because it technically also allows us to write noncePeriod = 1h
in config/server
, which means something very different (it would be 1 billion hours). This seems like an acceptable risk, since this is only setting defaults.
We might also want to figure out how to migrate noncePeriod, but that's a different discussion, and definitely doesn't belong in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we lose the type aliasing at compile-time, right?
Yes, we cannot get user defined type using reflection.
internal/config/reminder/config.go
Outdated
// SetViperDefaults sets the default values for the configuration to be picked up by viper | ||
func SetViperDefaults(v *viper.Viper) { | ||
v.SetEnvPrefix("reminder") | ||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We replace -
with _
in the server/config.go
version of this -- any harm in doing the same here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// InvalidBatchSizeError is a custom error type for the case when batch_size is less than | ||
// max_per_project * min_project_fetch_limit | ||
type InvalidBatchSizeError struct { | ||
BatchSize int | ||
MaxPerProject int | ||
MinProjectFetchLimit int | ||
} | ||
|
||
func (e *InvalidBatchSizeError) Error() string { | ||
return fmt.Sprintf("batch_size %d cannot be less than max_per_project(%d)*min_project_fetch_limit(%d)=%d", | ||
e.BatchSize, e.MaxPerProject, e.MinProjectFetchLimit, e.MaxPerProject*e.MinProjectFetchLimit) | ||
} | ||
|
||
// Validate checks that the recurrence config is valid | ||
func (r RecurrenceConfig) Validate() error { | ||
if r.MinElapsed < 0 { | ||
return fmt.Errorf("min_elapsed %s cannot be negative", r.MinElapsed) | ||
} | ||
|
||
if r.Interval < 0 { | ||
return fmt.Errorf("interval %s cannot be negative", r.Interval) | ||
} | ||
|
||
if r.BatchSize < r.MaxPerProject*r.MinProjectFetchLimit { | ||
return &InvalidBatchSizeError{ | ||
BatchSize: r.BatchSize, | ||
MaxPerProject: r.MaxPerProject, | ||
MinProjectFetchLimit: r.MinProjectFetchLimit, | ||
} | ||
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could also do this as:
// InvalidBatchSizeError is a custom error type for the case when batch_size is less than | |
// max_per_project * min_project_fetch_limit | |
type InvalidBatchSizeError struct { | |
BatchSize int | |
MaxPerProject int | |
MinProjectFetchLimit int | |
} | |
func (e *InvalidBatchSizeError) Error() string { | |
return fmt.Sprintf("batch_size %d cannot be less than max_per_project(%d)*min_project_fetch_limit(%d)=%d", | |
e.BatchSize, e.MaxPerProject, e.MinProjectFetchLimit, e.MaxPerProject*e.MinProjectFetchLimit) | |
} | |
// Validate checks that the recurrence config is valid | |
func (r RecurrenceConfig) Validate() error { | |
if r.MinElapsed < 0 { | |
return fmt.Errorf("min_elapsed %s cannot be negative", r.MinElapsed) | |
} | |
if r.Interval < 0 { | |
return fmt.Errorf("interval %s cannot be negative", r.Interval) | |
} | |
if r.BatchSize < r.MaxPerProject*r.MinProjectFetchLimit { | |
return &InvalidBatchSizeError{ | |
BatchSize: r.BatchSize, | |
MaxPerProject: r.MaxPerProject, | |
MinProjectFetchLimit: r.MinProjectFetchLimit, | |
} | |
} | |
return nil | |
} | |
func (r RecurrenceConfig) GetInterval() time.Duration { | |
// Values this low should only be used for testing... | |
minInterval = 1 * time.Minute | |
if r.Interval < minInterval { | |
return minInterval | |
} | |
return r.Interval | |
} | |
func (r RecurrenceConfig) GetBatchSize() int { | |
if r.BatchSize < r.MaxPerProject*r.MinProjectFetchLimit { | |
return r.MaxPerProject*r.MinProjectFetchLimit | |
} | |
return r.BatchSize | |
} | |
func (r RecurrencConfig) GetMinElapsed() time.Duration { | |
minElapsed = 1.5 * time.Minute | |
if r.MinElapsed < minElapsed { | |
return minElapsed | |
} | |
return r.MinElapsed | |
} |
And then you could avoid needing specific Validate
and error-handling code elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this would be too much patching. If the time specified is < 0
, it's very fair to give an error. If we patch every user mistake, bad configs might creep in git repos, build scripts, etc. An error would force the user to fix it. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm willing to respect that. We'll go with validation here.
func (r *reminder) Stop() { | ||
if r.ticker != nil { | ||
defer r.ticker.Stop() | ||
} | ||
r.stopOnce.Do(func() { | ||
close(r.stop) | ||
r.eventDBCloser() | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, a couple things you can do:
- You can return a
cancel
function fromStart
. This lets you ensure that the caller can't get a handle on the function beforeStart
is called. - You can use something like a condition variable or a mutex to protect the fields managed by
Stop
.
r.logger.Debug().Msg("storing cursor state") | ||
|
||
var buf bytes.Buffer | ||
enc := gob.NewEncoder(&buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to keep track of all the projects processed in an iteration? It should be fine to re-run a small number of projects, so we could keep just the first (or last) item in the batch.
Binary formats are a pain for humans to work with (we need to build extra tools), so I'd prefer JSON or YAML if possible so we can clean up intermediate state in an emergency with simple tools.
return err | ||
} | ||
|
||
return os.WriteFile(r.cfg.CursorFile, buf.Bytes(), 0600) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see storeCursorState
being called at all, so I had to guess about where and how the call chain to it happened.
f6d9810
to
f73e03f
Compare
Signed-off-by: Vyom-Yadav <jackhammervyom@gmail.com>
f73e03f
to
588571f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm willing to merge this and revisit the cursor storage in a subsequent PR if that helps -- I do want to get some progress on this, but I'm hoping that we can simplify the design and amount of state that Reminder is carrying, since it doesn't have direct visibility into things like "how much API quota has this app installation used in the last hour".
With that said, my biggest stand-firm is on making sure that it's simple and reliable to roll forward and back Reminder while it's running, so I'd strongly prefer using a Kubernetes Deployment over a StatefulSet.
sql_connection: | ||
dbhost: "reminder-event-postgres" | ||
dbport: 5432 | ||
dbuser: postgres | ||
dbpass: postgres | ||
dbname: reminder | ||
sslmode: disable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, interesting.
We could theoretically run multiple reminder servers which would interoperate with each other.
I can see this happening, but I don't have the design pattern for it, and it isn't required now, so I won't brainstorm on that side.
The idea would be that we could run multiple servers for high availability. I agree that it's not necessary for right now, as long as we make sure that it's not strictly required to only run one Reminder for correctness.
(You good with storing in memory for now (cache) ?)
Yes.
sql_connection: | ||
dbhost: "reminder-event-postgres" | ||
dbport: 5432 | ||
dbuser: postgres | ||
dbpass: postgres | ||
dbname: reminder | ||
sslmode: disable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, do we want to modify
type RepoReconcilerEvent struct
to have a source string field for visibility and helping to differentiate whether the event was generated by reminder or minder. (For topic:TopicQueueReconcileRepoInit
)
Rather than putting it in the type RepoReconcilerEvent struct
, can you put something like periodic=true
or trigger=revisit
in the message metadata?
Ideally, we'd have a good split between routing metadata (message subject, event type, etc) and the message contents (diff of the change, for example). Having the metadata in the outer envelope allows us to generically do things like 2-lane queues (handle revisits at lower priority than live notifications) and record success rates for revisits separately. The generic top-level handling layer should not know about the payload schema -- that's for the actual endpoint to decode.
sql_connection: | ||
dbhost: "reminder-event-postgres" | ||
dbport: 5432 | ||
dbuser: postgres | ||
dbpass: postgres | ||
dbname: reminder | ||
sslmode: disable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding cursors to the db would look something like:
CREATE TABLE reminder_project_cursor ( id SERIAL PRIMARY KEY, -- Some sort of ID to differentiate between different reminders cursor TEXT NOT NULL ); CREATE TABLE reminder_repository_cursor ( id SERIAL PRIMARY KEY, -- Some sort of ID to differentiate between different reminders project_id UUID NOT NULL, provider TEXT NOT NULL, cursor TEXT NOT NULL, FOREIGN KEY (project_id, provider) REFERENCES providers(project_id, name) ON DELETE CASCADE, UNIQUE (project_id, provider) );Using this approach every reminder service would have some unique identifier associated with it. I had a few concerns:
- Are we simplifying by storing in db or complicating it more than required? (With in-memory cache and all that stuff)
- Do we even need to run multiple reminder instances? You told me a lot of other things would break before we scale.
- If yes, wouldn't it be simpler to have a reminder service operate on shard? Or set a limit somehow? (Taking a wild guess here)
- Are we over-engineering this 😬 ?
Do we need fairness at both the project and the repo level? My naive thought would be to iterate across all the entities -- projects which had more repos registered would get more "revisits / day" than smaller projects, but that seems appropriate. We probably want to limit how many concurrent reconciliations are happening in a project, but that might make more sense to do at the Minder level, since we could have a project that genuinely gets changes to 50 repos at the same time.
It's hard to guarantee fairness until we know what fairness looks like. 😁
In terms of running multiple reminder instances, my goal is that we should be able to upgrade Reminder as follows:
- Reminder v.123 is running one instance
- We start a Reminder v.124
- We shut down Reminder v.123
This is easy to do with a Kubernetes Deployment with maxSurge: 1
, and avoids getting into a situation where v.124 doesn't work, but we've corrupted state needed for v.123 to operate, so we end up needing to do manual maintenance. This is also the model we use with the core Minder binary, so it's less surprising than having a different model. As long as we can use that manner of deployment, I'm happy with many different solutions:
a. Iterate over the database several times faster than the reminder interval, but honor some pacing on actually sending the events. In this case, we might not need to keep a cursor at all.
b. Add a database table to track the Reminder cursors.
c. Use "continuation messages" in Watermill to describe what we just processed. e.g. at the end of processing up to row N, send a "now process >N" message, then acknowledge the "now process >N-10" message that was in queue earlier. If there's no message, start at a random point.
r.logger.Debug().Msg("storing cursor state") | ||
|
||
var buf bytes.Buffer | ||
enc := gob.NewEncoder(&buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to keep track of all the projects processed in an iteration? It should be fine to re-run a small number of projects, so we could keep just the first (or last) item in the batch.
We have project cursor and repo cursor for every project we traverse. The repo cursor list grows as we traverse more projects because we only pick small no of repos from a project to have sort of fair share type algo and avoid hitting rate limits for that project.
If we were to traverse the lists in order of repo ID and rely on Minder to limit reconciliations during contention, would that serve the same result? I'm worried that we only have a partial view of what's going on in terms of quota usage here, so I'd rather have Reminder act as a reliable clock but not get too fancy in terms of trying to push back because Minder will still need to deal with quota exhaustion in other cases. (We had one about a week ago due to poor cache behavior, for example.)
@evankanderson Ready when you are. We can discuss enhancements and cursors in a discord thread for a separate PR. |
Summary
Provide a brief overview of the changes and the issue being addressed.
Explain the rationale and any background necessary for understanding the changes.
List dependencies required by this change, if any.
Issue #2262
Does not fix the issue, addresses:
Does not address:
Change Type
Mark the type of change your PR introduces:
Testing
Outline how the changes were tested, including steps to reproduce and any relevant configurations.
Attach screenshots if helpful.
Review Checklist: