Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve connector failure handling logic #85

Merged
merged 6 commits into from
May 22, 2023

Conversation

linhr
Copy link
Contributor

@linhr linhr commented Apr 25, 2023

What type of PR is this?

  • bug
  • feature
  • enhancement

What problem(s) does this PR solve?

Issue(s) number

N/A

Description

We have found a few edge cases when the connector does not handle failures gracefully. This PR tries to make the implementation more robust, based on our observations running the connector in production in the past few months.

How do you solve it?

We have the following changes to the connector:

  1. Support retries in the executor.
  2. Support raising exceptions and letting the task fail fast when there are unrecoverable errors. (This behavior is configurable. See the following about the configuration.)
  3. Support renewing the session when the execution fails. This fixes Session not existed! #83.
  4. Do not return and store the error message from the NebulaBatchExecutor.executeBatch() method. This prevents Java heap memory to grow out-of-bound when there are a large number of correlated errors.
  5. Polish various error messages in the log and exceptions.

We introduce three new options for the Table API connector:

  • max-retries: Maximum number of retries in the execution. The default value is 3.
  • retry-delay-ms: The delay between retries, in milliseconds. The default value is 1000.
  • failure-handler: The failure handling mode when execution retries are exhausted. The valid values are (1) ignore, which skips the batch that fails; (2) fail, which fails the task. When the task fails, Flink may restart the task depending on how the job is configured. This also allows for detection and manual intervention when there is an unrecoverable error (due to corrupted data, unhealthy NebulaGraph cluster, bad network connection, etc.). The default value is ignore, which is consistent with the current implementation. So the option backward-compatible.

The DataStream connector can be configured similarly using ExecutionOptions.

Special notes for your reviewer, ex. impact of this fix, design document, etc:

N/A

@codecov-commenter
Copy link

Codecov Report

Patch coverage: 71.27% and project coverage change: +1.88 🎉

Comparison is base (6b3cad3) 65.12% compared to head (a8cc8d1) 67.00%.

❗ Current head a8cc8d1 differs from pull request most recent head 2e86172. Consider uploading reports for the commit 2e86172 to get more accurate results

📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more

Additional details and impacted files
@@             Coverage Diff              @@
##             master      #85      +/-   ##
============================================
+ Coverage     65.12%   67.00%   +1.88%     
- Complexity      309      330      +21     
============================================
  Files            53       55       +2     
  Lines          1878     1961      +83     
  Branches        167      170       +3     
============================================
+ Hits           1223     1314      +91     
+ Misses          570      563       -7     
+ Partials         85       84       -1     
Impacted Files Coverage Δ
....flink/connector/nebula/catalog/NebulaCatalog.java 29.29% <0.00%> (ø)
...link/connector/nebula/sink/NebulaSinkFunction.java 66.66% <ø> (ø)
...ink/connector/nebula/source/NebulaInputFormat.java 68.75% <0.00%> (ø)
...e.flink/connector/nebula/utils/NebulaConstant.java 95.00% <ø> (ø)
...connector/nebula/sink/NebulaBatchOutputFormat.java 54.90% <54.09%> (+7.75%) ⬆️
.../nebula/sink/NebulaTableBufferReducedExecutor.java 88.57% <80.00%> (-1.43%) ⬇️
...ink/connector/nebula/sink/NebulaBatchExecutor.java 83.33% <83.33%> (ø)
...connector/nebula/sink/NebulaEdgeBatchExecutor.java 87.09% <100.00%> (+17.09%) ⬆️
...nnector/nebula/sink/NebulaVertexBatchExecutor.java 86.66% <100.00%> (+17.43%) ⬆️
...nnector/nebula/statement/EdgeExecutionOptions.java 92.55% <100.00%> (+1.08%) ⬆️
... and 5 more

... and 3 files with indirect coverage changes

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

numPendingRow = 0;
break;
} catch (Exception e) {
LOG.error(String.format("write data error (attempt %s)", i), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's better to LOG.warn rather than LOG.error? print the error only when the maxRetries is used out and failOnError is false.
btw, using %d for int i.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I've fixed this.

if (failOnError) {
throw e;
}
} else if (i + 1 <= maxRetries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the condition(i+1<=maxRetries) is useless.
could you please explain this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. I've updated the logic and removed this condition.

}
// We do not know whether the failure was due to an expired session or
// an issue with the query, so we renew the session anyway to be more robust.
renewSession();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should get the ResultSet of nebulaBatchExecutor.executeBatch(session) and judge the errorCode. If the ErrorCode of ResultSet is E_SEMANTIC_ERROR and E_SYNTAX_ERROR, there's no need to retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I've updated the logic to support this.

@linhr
Copy link
Contributor Author

linhr commented May 18, 2023

@Nicole00 Thanks for your review! I've addressed the comments accordingly. Let me know if there is more feedback on this PR.

cc @wey-gu

@linhr linhr requested a review from Nicole00 May 18, 2023 08:33
if (failOnError) {
throw e;
}
renewSession();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only condition to renewSession is there are SESSION_INVALD and SESSION_TIMEOUT error code.

  • For IOException, we do not need to renew Session but need to renew Session's Connection, then we can reduce the re-auth opertion.
  • For EXECUTION_ERROR error code, the reason is on Storaged, we can just re-execute with the same Session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nicole00 Yeah, I can see that there may be situations where renewing the session is unnecessary, but here I took a simple approach to make sure the subsequent operation always starts in a clean state.

  • For IOException, right now we use the NebulaPool.getSession() method to get the session. I looked at its implementation and it internally manages a pool of connections. There may be opportunities to avoid unnecessary authentication, but that may require us to manage sessions/connections in the Flink connector itself. This is probably a bigger body of work. Maybe this is something we can consider in the future, or something that can be improved inside the NebulaGraph Java client?
  • For E_EXECUTION_ERROR and many other error codes, I'm worried about the situation of "partial success" even the final outcome is an error. For example, there may be a temporary issue with one of the storaged services while the other ones function normally. I've also seen the session times out while the slow query is still being executed in storaged. This is problematic in Flink applications where we process a mix of insert/update/delete events, and concurrent writes to the same key may result in out-of-order processing and corrupted data at the sink. Therefore, I'd like to renew the session anyway, which proactively kills the existing session (and any query that are still running), so that we rule out the possibility of concurrent queries. In other words, in Flink I'd prefer there is only one query running for the session attached to each Flink task, and I'd be extremely careful about session reuse after an error.

In general, I think the current "catch-all" session renew logic does not affect the correctness of error handling, and it's robust due to its simplicity. There may be situations where some renewals can be avoided, but we could treat this as an optimization to revisit in some future PR. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linhr Yeah, you are right, renew session anyway is totally a correct and simplest way when query error, and is also the roughest way 😁.

  • java client indeed has retry mechanism for IOException, need to config the reconnect as true when getSession, so we do not need to process this in Flink connector.
  • And For the concurrent query you mentioned, it does not exist here, because the session does not support concurrent query, and the client has a synchronized lock on execute interface.
  • At last, I think it's ok to optimize the re-execute logic in future, and maybe we will replace the java client session to SessionPool, which already process the re-execute and we just need to provide some configs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nicole00 Thanks for the comment! Yeah it would be great to further improve the code in the future using the built-in error handling support in the Java client.

Regarding the concurrent query, let me try to explain the situation a bit more. If the previous query fails, it may not have failed cleanly. For example, when the query times out, we get the error and the lock is released. At this point we can run another query to insert the same data using the same session. However, the previous failed query is still being executed, so we get concurrent queries. Since we do not have transactional writes and the previous execution is not rolled back, the actual data finally written is not well defined. This is of course an edge case, but I was concerned that it may lead to corrupted data, so I renewed the session to be safe.

Happy to discuss!

Copy link
Contributor

@Nicole00 Nicole00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonderful pr! Thanks @linhr for your contributions, happy to discuss with you.

@Nicole00 Nicole00 merged commit 24a7514 into vesoft-inc:master May 22, 2023
@wey-gu
Copy link

wey-gu commented May 22, 2023

Thank you @linhr for the contribution, it's great to have you in the NebulaGraph community!

@linhr linhr deleted the executor-failure-handling branch May 23, 2023 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Session not existed!
4 participants