Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve distributed load #18153

Merged
merged 1 commit into from
Sep 15, 2023

Conversation

elega
Copy link
Contributor

@elega elega commented Sep 14, 2023

What changes are proposed in this pull request?

Improve distributed load

  1. Configurable job failure criteria
  2. Configuration to determine if the load job should be restored from journal or not
  3. Add an option to skip existing fully loaded file
  4. Add retry count for failed files
  5. Bug fixing

Why are the changes needed?

To enhance the distributed load tool

Does this PR introduce any user facing changes?

Yes. The skip-if-exists option is added to the distributed load cli.

@elega elega force-pushed the yimin/improve-distributed-load branch from c967e8e to c8bc49d Compare September 15, 2023 11:29
@JiamingMai
Copy link
Contributor

LGTM

@JiamingMai JiamingMai added the type-feature This issue is a feature request label Sep 15, 2023
@elega
Copy link
Contributor Author

elega commented Sep 15, 2023

alluxio-bot, merge this please.

@alluxio-bot alluxio-bot merged commit 6a9f5fd into Alluxio:main Sep 15, 2023
12 checks passed
Copy link
Contributor

@jja725 jja725 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvement, but I do have some concerns regarding retry and health threshold

public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD =
intBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD)
.setDefaultValue(-1)
.setDescription("The load job total load failure count threshold. -1 means never fail.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we set this to never fail as default? I think in production env we should fail fast if there are a lot of exceptions. And we would record every failure which cause a lot of memory pressure on the cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because sometimes the loading takes quite long. In my last experience, it took 2-3 days to load the whole data from UFS (about 300m files 0.5PB). If we fail the job in the middle if there's too many errors, the job will never succeed to load. So what i did is to let it load as much as possible and never fail.

@@ -2291,6 +2291,38 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL =
booleanBuilder(Name.MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with adding an option but I would prefer to have fewer properties and one of the big complaints from customers is that we have too many properties. In which situation we would benefit if this is false? The only situation I can think of is in test env.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess we discussed this offline..... I proposed to stop restoring incomplete jobs from the journals which you showed your concorn with.... That's why i added an option for this. Mind reminding me if there's a better way?

When i used this distributed tool in a real cluster, i've noticed that after restarting the cluster, the job just starts unexpectedly and created some operaation challenges.

@@ -108,11 +108,14 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {

// Job states
private final Queue<String> mRetryFiles = new ArrayDeque<>();
private final Map<String, Integer> mRetryCount = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the meaning of retry count since if it's retryable we just retry and should not retry a lot of times, and if it's not retryable we just do not retry. What we should do is pass the correct exception to the scheduler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so first this retry-able flag isn't reliable... Workers just populate this flag based on exception type and sometimes we just don't handle this correctly. E.g. in the previous implementation, any UnknownRuntimeException will be marked as not retryable, which is pretty much wrong. Plus there's individual UFS implementations which may throw different exceptions. It's hard to give a correct flag on the worker side.

secondly, even if an error is retry-able, we should not retry it indefinitely.... That endless retry might block the following tasks or decrease the performance significantly.

also sometimes the retry-able flag is not available. What if the RPC just fails? Do we also want to retry it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.alibabacloud.com/help/en/oss/developer-reference/error-handling-1
Regarding the error handling, we can polish our UFS exceptions like AlluxioS3Exception and handle each exception properly. I agree that we could have a retry limit for failed files to avoid unlimited retry, But it should be combined with the health threshold to ensure we are not storing too many failed files.

AlluxioRuntimeException t = AlluxioRuntimeException.from(e);
errors.add(LoadFileFailure.newBuilder().setUfsStatus(status.toProto())
.setCode(t.getStatus().getCode().value())
.setRetryable(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why always set retryable as true? Then what's the meaning of retryable...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this flag is not reliable.... we are not able to tell if an exception is really retry-able by just looking at t.isRetryable().
This one caused a couple of issues when i used this tool previously. Either ignoring retryable errors or doing some endless retries...
If you want to improve this. I think you can default the retryable to true and only add a couple of non-retryable "exceptions"

.setCode(t.getStatus().getCode().value())
.setRetryable(t.isRetryable() && permissionCheckSucceeded)
.setMessage(t.getMessage()).build());
if (!loadData || !status.isFile()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm not sure changing else condition to continue would help readability...

@jja725
Copy link
Contributor

jja725 commented Sep 15, 2023

And could you elaborate more about the bug-fixing part? It may hide somewhere I didn't notice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type-feature This issue is a feature request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants