-
Notifications
You must be signed in to change notification settings - Fork 500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Preview] ChangeFeedProcessor: Adds support for manual checkpoint, context, and stream #2331
Conversation
@bartelink Would love to hear your feedback on the proposed APIs in this PR |
Firstly, thanks so much for prioritizing this work. I'm really looking forward to getting off a parked 2018 era timebomb, having less need to manage code and prod instance of apps using two Cosmos SDKs in the same process, and being able to participate more meaningfully in providing proper feedback on the actual codebase the team is maintaining. It seems to me that having a divide between I personally think the Delegates are the way to go for the API design (It should be mentioned that I mainly program in F# so delegates are may feel inordinately comfortable with compared to the average dev.). Spawning lots of types makes it much harder to grasp that lie of the land for a newcomer and makes the API docs messy (lots of undocumentation boilerplate), i.e. you want to convey that there are two major axes of choice:
|
Also, thanks for taking the time to write up the overview as you normally do. I watch this repo and scan all issues, with varying degrees of attention depending on time of day and workload etc. I thought I'd raise this point with regard to terminology though... This is actually the first time that I got what you were talking about with respect to the term While Stream may make sense to you and/or be consistently used across the SDK, I'm not sure thats a universally well known term for deferring deserialization and/or propagating the document content without strongly typing and/or deserializing it first. Qualifying it by either using a term for the strong type binding/deserialization mechanism and/or one for the technique of deferring it and instead propagate the raw content might help learnability of the SDK |
@bartelink Regarding your comment of:
What you mean is that the
Which makes it explicit on the call and Intellisense would not give other options. |
CancellationToken cancellationToken); | ||
|
||
/// <summary> | ||
/// Delegate to receive the changes within a <see cref="ChangeFeedProcessor"/> execution with manual checkpoint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to call out more explicitly that no auto-checkpointing is done at all anymore when using this delegate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, @bartelink said the same, that is why the Builder is different:
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
string processorName,
ChangeFeedHandler<T> onChangesDelegate);
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint<T>(
string processorName,
ChangeFeedHandlerWithManualCheckpoint<T> onChangesDelegate);
That way, the delegate can only be used if the user is explicitly calling the builder with manual checkpoint.
Is that good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - Thanks
Thanks for getting this done :) |
There will be a new release hopefully this week or next week. |
I don't see it in 3.18.0 :( but found it in 3.19.0-preview :) |
@fuocor It's in the preview package, not the GA yet. 3.19.0-preview |
While I have your attention though... Firstly, kudos to all involved in writing https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-migrate-from-change-feed-library Any hints on how to port conditional parsing of Documents like this ? https://github.com/jet/propulsion/blob/629ea0646a4dddfff5f9ca53e44c1a5dbdaa8379/src/Propulsion.Cosmos/EquinoxCosmosParser.fs#L39-L40 I believe if I just used In other words, for the the |
@bartelink Glad the migration guide is useful, once these new APIs are GAed, I'll update it further with the new APIs. If before you were using Document, you can use JObject (same thing) probably? Have you tried it? |
The delegate for taking checkpoint (tryCheckpointAsync) is returning result and exception. This contradicts with common async API model in C# where exception are thrown. I don't see a reason why consumers can not handle the exception in normal C# style instead of having to use C style error checking. |
@bartelink, @fuocor do you have any opinion regarding the checkpoint API similar to @PradSenn? |
The API as it is happens to meet my expectation and requirement exactly given that otherwise the very first thing I'd have to do is wrap the invocation in a try/catch. Is there precedent elsewhere in the SDK for this? Having said that, the guidelines do suggest to default to throwing. A final point I'd make is that this particular API, being an advanced one that is intended for people who are going to significant lengths to do explicit checkpoint management in order to achieve better performance, is not the sort where using this newer paradigm is going to cause an issue. |
@bartelink - Thanks for the insightful comment! @PradSenn - My motivation for the design was that not all developers remember or do try/catch, while having an API that explicitly says whether there was an error or not, forces the developer to make a conscious decision. Changing it to throwing is rather simple, as @bartelink says, this API is advanced enough that either throwing or with the current API, a user should handle it with enough care. So I guess it's something we can tackle before GA. Could you create an Issue to track it? |
I agree with @bartelink. |
@PradSenn @bartelink @fuocor - Sent #2488 to change the signature based on the feedback |
The
For me, this does not (read: did not; had me wondering what I was missing) convey enough information when porting more complex scenarios using the V2 CFP's Perhaps it could refer to a shard, physical partition, segment, range or some such synonym ? I actually think the root problem here is the name - I appreciate based on #1122 that there are reasons to leave it abstract, but perhaps the relationship could be mentioned or illustrated in https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-migrate-from-change-feed-library ? I'm also wondering if there is a replacement for the callback that one could previously use to log when a Lease is assigned? |
@bartelink - I will update the migration docs once the APIs GA. We want to expand the CFP beyond the Partition restriction, so the abstract is a way to ensure that. Obviously from a migration perspective, yes, the LeaseToken will be equivalent initially, but not be the case forever. For the callback - I think #2501 is what you are looking for. |
Yes I accept your central point, but the Yes #2501 is correct answer re the callback (I had read that issue but wasn't able to locate it when I was heads down porting). The text in there also does not use this token terminology - I'd again contend that there is missing terminology for 'acquired (leased) segment of a monitored container' ;) (Also I'm puzzling whether the porting guide should have a 'where has the observer gone' paragraph i.e. TL;DR all changes get supplied to a single callback and are distinguished by the |
Yeah, I agree, the Regarding monitoring, ideally the events will also show the LeaseToken that contextually was there when the event happened. Right now it's mostly a design concept I'm playing with, mostly out of trying to help people self-diagnose issues. Something like that API + docs should be all that is needed. |
@bartelink question on the Open/Close of the observer. Are you currently hooking logic (other than telemetry) to those events? If so, which? If it's just telemetry, would a non-programmatic way to access that telemetry (such as something like OpenTelemetry wired up through the SDK) be sufficient if those events were raised there? |
I ported the logic in jet/propulsion#113 See below the fold para for the real answer as to the use case that can't be achieved without an unassigned hook In terms of the pure telemetry aspect, while I literally can't replicate the logging as it was (clear events as to when a range is assigned and unassigned, being able to identify lease timeouts by seeing storms of those in the log etc). If the assigned/unassign events were hookable, I'd definitely use those, but would not be able to thread them into the logging context hierarchy alongside other activity per lifetime of an observer in the same manner. However, I can deal with that, especially as better thought out OT integration can definitely improve on logging of that kind as a source of troubleshooting context and hints. The lack of Disposal meant I had to do some messy reworking of the consumption pipeline (slightly concealed by the fact that some preparatory work in in preceding PRs). I run an Async pumping loop per source partition/shard/token which: The lack of Disposal (unassign notifications) means I'd need to add timers to dispose buffers and/or back off if I wanted to retain the same resource consumption over time (imagine there being 100 physical partitions and each consumer being bounced around those over time). (If there's no Disposal event, the lack of an initialization event is not a big difference - I check for initialization of the processing loop for a given assignment and log on first hit.) However, I guess if I'd never had a Dispose hook, I'd probably have come up with a cleaner and more scalable pumping mechanism Having said that, the above, while important and slightly painful from a porting perspective, can be worked around (It should also be noted that it seems the V2 CFP doesn't always trigger Disposal consistently anyway, i.e. the V2 impl may have bugs) 👉 The thing that an unassigned event can uniquely enable is to be able to recall work (batches read ahead and passed to teh processor, but not yet completed and therefore checkpointed) that has been optimistically passed to the processing engine, and/or partially completed. If the CFP reassigns to another node, the current node cannot purge the work and will thus be working in competition with the other node that will next be assigned the partition that got unassigned Any reporting of assignments/unassignments would only be useful if there was an equivalent sequencing of a) assign b) 0..n changes c) unassign - i.e. providing the info out of band does not really enable the above scenario. |
Description
This PR builds on top of the great work that @fuocor did in #888. That PR was stale and not passing tests, but had goodness in the feature it was bringing. I could not cherry-pick from a fork but acknowledge will go to the author on the changelog notes 😄
New delegates
We previously had a single delegate to process changes:
In order to address the requests in the linked incidents, we are introducing 4 new delegates:
Container API changes
Stream support
Stream APIs allow customers to send the data to other components without serializing it on the Change Feed Processor, this is possible through 2 of the new delegates, the most common one that will handle checkpointing for the user:
And one where the use can manage checkpoint themselves:
How to leverage manual checkpoint?
There isn't any extra configuration required on the
ChangeFeedProcessorBuilder
other than using the delegate that exposes the checkpoint logic. Choosing this delegate is an implicit opt-in on the logic.This enables users to decide when to checkpoint the progress of a lease. Some users decide to do buffering of changes and once the buffer commits (processes all the changes), then they decide to checkpoint.
Any of the 2 delegates that expose the checkpoint mechanism allows for this, via the call to
tryCheckpointAsync
, like so:The call can fail, so checking
isSuccess
is relevant to understand the failure. There are cases where checkpoint can fail due to load balancing (yielding a 412 - Precondition failed error), or there could be other issues (the lease collection could have been deleted for example).What is there in the context?
The context includes information related to changes delivered on the delegate, it includes the identifier of which lease they belong to (this is useful paired with the
Container.ChangeFeedEstimator
for monitoring). the Headers (that expose RU consumption and SessionToken for extending the session to other components), and the Diagnostics.Relevant files for reviewing
The size of this PR is rather large but mainly due to refactoring required to support Stream handlers (removing
<T>
from multiple classes), I took the opportunity to cleanup old code that was never used at all but was there when we migrated the code from the original CFP.The most important files are:
Type of change
Closing issues
Closes #1765
Closes #616
Closes #865
Closes #400
Closes #1122