-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1984] Add consensus flowExecutionId to FlowSpec to use for compilation #3857
Conversation
tests on my fork umustafi#17 |
@@ -122,7 +119,7 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf | |||
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, | |||
SharedFlowMetricsSingleton.CompiledState.SKIPPED); | |||
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter()); | |||
if (!isScheduledFlow((FlowSpec) spec)) { | |||
if (!((FlowSpec) spec).isScheduled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a scenario where spec
isn't a flowspec here? Maybe it makes sense for the function to require a flowspec so we don't need to keep casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout, from checking the code we only call these functions with a FlowSpec and otherwise throw an error. Changing it to FlowSpec.
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #3857 +/- ##
============================================
- Coverage 47.59% 44.62% -2.98%
+ Complexity 11069 2998 -8071
============================================
Files 2160 671 -1489
Lines 85480 26255 -59225
Branches 9478 2657 -6821
============================================
- Hits 40687 11716 -28971
+ Misses 41107 13546 -27561
+ Partials 3686 993 -2693 ☔ View full report in Codecov by Sentry. |
* @param key | ||
* @param value | ||
*/ | ||
public void addProperty(String key, String value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to ensure these two (now modifiable) fields only ever change in lockstep, instances now require synchronization around all access of either field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the volatile
keyword and note about accessing both parameters with synchronized key word if multi-threaded access (which is unlikely since we'd probably only access one not both). When trying to use synchronized keyword on the two functions it results in a number of Inconsistent synchronization of org.apache.gobblin.runtime.api.FlowSpec.config; locked 50% of time
errors so I had to remove the @Data
annotation for the class and would have to add separate @Getter/Setter/toString
etc... annotations for all the fields to make only Config/ConfigAsProperties
synchronized. I don't think this is that useful so made the compromise above. Let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably overkill but I think setting a rw lock would handle this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggested to set configAsProperties
and config
as private
and volatile
, so one should always be forced to use this method to update them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also remove the findBugsMain error, because by annotating them volatile
we are telling it that these fields can be changed from other threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the fields to be private so they are only updated through the addProperty method and bc we don't expect this scenario to occur I don't want to over complicate with rw lock for now. Future developers should be able to quickly notice this with the volatile keyword and documentation.
.../src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
Outdated
Show resolved
Hide resolved
@@ -276,8 +277,12 @@ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction, | |||
try { | |||
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); | |||
spec = (FlowSpec) flowCatalog.getSpecs(flowUri); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should FlowCatalog.getSpecs
override w/ a covariant return type?
i.e. the FlowSpec
in:
@Override
public FlowSpec getSpecs(URI uri) throws SpecNotFoundException {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you're wondering how Java knows which to choose at runtime - given this is still an @Override
, per usual it's chosen dynamically based on the runtime type.
the difference here is the more-specific typing in cases where the SpecCatalog
derived type can be guaranteed statically.
to illustrate, these two are legal:
URI uri = ...
SpecCatalog sc = ...
Spec s = sc.getSpecs(uri);
and
FlowSpecCatalog fsc = ...
FlowSpec fs = fsc.getSpecs(uri);
but this does NOT type-check:
SpecCatalog sc = ...
FlowSpec fs = sc.getSpecs(uri);
it not only saves us typing on casting, but, in situations where we've already successfully casted to FlowSpecCatalog
, it precludes failure from subsequent ClassCastException
to FlowSpec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this method but left the other methods that return Collection<Spec>
intact as to avoid iterating through each one to cast it to a FlowSpec.
*/ | ||
|
||
/** Flow config as a typesafe config object which can be replaced */ | ||
volatile Config config; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a sign that we should reduce the API if it's being used like this, but currently I can't think of a scenario where we have concurrent read or updates of the same flowspec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I don't really expect a scenario with concurrent reads/updates so I am marking it as volatile to indicate I'm aware there could be a synchronization issue but don't want to over engineer a solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package-protected volatile
is atypical. I like the private volatile
+ accessor that arjun suggested.
/** Flow config as a typesafe config object */ | ||
final Config config; | ||
/* Note that since getConfig() and getConfigAsProperties() are independent accessors, `volatile` doesn't ensure a | ||
* consistent view between them. In a multi-threaded scenario one should use the following access mechanism: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change it to
Note that since getConfig() and getConfigAsProperties() are independent accessors, in a multi-threaded scenario one should use the following access mechanism:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually you can also provide a getMethod(), which will return only when addProperty
is not being used. This way, users will not have to use syncronized
/* Note that since getConfig() and getConfigAsProperties() are independent accessors, `volatile` doesn't ensure a | ||
* consistent view between them. In a multi-threaded scenario one should use the following access mechanism: | ||
* FlowSpec fs = ... | ||
* synchronized (fs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggesting users to syncronized over the entire FlowSpec
does not look good.
Also, it looks a unnecessary usage of calling getConfig
and getConfigAsProperties
and compare both.
Both the fields can momentarily have different value, but that's not a problem as long as only syncronized addProperty
method has write access to them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree it would be overkill in the general case, wherein the user is only getting one or the other of the two. there's no need for a single accessor to synchronize on the instance, given volatile
.
what I believe urmi's suggesting is to lock the object if one wants to access both. this ensures both sides agree. perhaps such clarification could be added to the comment:
In a multi-threaded scenario one should use the following access mechanism:
?
it sounds overly broad as phrased above, given "a MT scenario" applies to all our code everywhere.
the other important thing to mention is that the synchronized
block should only be small enough to obtain the two, so it doesn't keep the object locked.
the other alternative would be to add a combo method to this FlowSpec
derived class:
/** use for consistency between the two... */
public synchronized Pair<Config, Properties> getConfigAndProperties() {
// ultimate correctness would be to return a defensive copy of `Properties`
}
would this perhaps be clearer, to provide a ready-made capability, rather than a long comment w/ usage advice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact, the new potential to mutate via addProperty
means that getProperties()
should make a defensive copy to be absolutely correct.
as a reflection, there really is a lot to worry about once classes become mutable! although I see why it may be justified here, this complexity/headache is the reason I personally strive so hard to build immutable and state-less abstractions.
(in the model, things can still change, but when they do, a new instance of a successor instance is created and the original remains safely unchanged.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the comment. I can add the combo method but it won't require access only through the combo method so I'd have to leave the comment in anyway.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
In a previous change, we noticed the compiled
flowExecutionId
was different from the one during execution and tried to fix it by updating theflowMetadata
used to send GTE relating to compilation. See problematic patch Fix FlowSpec Updating Function #3823We succeeded in updating the compilation event to use the
flowExecutionId
but did not update theFlowSpec
used byCompiler
to create theDag
which is used during the execution phase of flow management. Thecompiler
retrieves aflowExecutionId
if one exists in theflowSpec
(which it does for adhoc flows), otherwise generates a new one (code.) We do need to modify theFlowSpec
then with the consensusflowExecutionId
before forwarding it to theDagManager
.Note I considered adding a
flowExecutionId
field to theFlowSpec
function but we have integrated the idea of retrieving flowExecutionId from config so deeply that it would be hard to change all the places that access the id through config. Additionally, I've decided against creating a newFlowSpec
(instead updating the existing one) since we only use theFlowSpec
for compilation and in creating a newFlowSpec
its important to make sure it's constructed correctly to avoid ser/deser issues.Tests
Updates validation helper tests and adds FlowSpec updateProperty test
Commits