Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix gremlin job result size gt cassandra limit and lt hugegraph limit #1334

Merged
merged 3 commits into from
Jan 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.baidu.hugegraph.task;

import java.util.Date;
import java.util.Set;
import java.util.concurrent.Callable;

import org.apache.tinkerpop.gremlin.structure.Transaction;
Expand All @@ -30,13 +31,29 @@
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableSet;

public abstract class TaskCallable<V> implements Callable<V> {

private static final Logger LOG = Log.logger(HugeTask.class);

private static final String ERROR_MAX_LEN = "Failed to commit changes: " +
"The max length of bytes is";
private static final String ERROR_COMMIT = "Failed to commit changes: ";
private static final Set<String> ERROR_MESSAGES = ImmutableSet.of(
/*
* "The max length of bytes is" exception message occurs when
* task input size exceeds TASK_INPUT_SIZE_LIMIT or task result size
* exceeds TASK_RESULT_SIZE_LIMIT
*/
"The max length of bytes is",
/*
* "Batch too large" exception message occurs when using
* cassandra store and task input size is in
* [batch_size_fail_threshold_in_kb, TASK_INPUT_SIZE_LIMIT) or
* task result size is in
* [batch_size_fail_threshold_in_kb, TASK_RESULT_SIZE_LIMIT)
*/
"Batch too large"
);

private HugeTask<V> task = null;
private HugeGraph graph = null;
Expand Down Expand Up @@ -98,7 +115,8 @@ protected void save() {
*/
LOG.error("Failed to save task with error \"{}\": {}",
e, task.asMap(false));
if (e.getMessage().contains(ERROR_MAX_LEN)) {
String message = e.getMessage();
if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) {
task.failSave(e);
this.graph().taskScheduler().save(task);
return;
Expand Down Expand Up @@ -138,6 +156,15 @@ public static <V> TaskCallable<V> fromClass(String className) {
}
}

private static boolean needSaveWithEx(String message) {
for (String error : ERROR_MESSAGES) {
if (message.contains(error)) {
return true;
}
}
return false;
}

public static <V> TaskCallable<V> empty(Exception e) {
return new TaskCallable<V>() {
@Override
Expand Down