Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributor which schedule task to be fairly #333

Merged
merged 4 commits into from
Nov 23, 2018

Conversation

EmmyMiao87
Copy link
Contributor

@EmmyMiao87 EmmyMiao87 commented Nov 21, 2018

Step1: updateBeIdTaskMaps, remove unalive be and add new alive be
Step2: process timeout tasks, if a task already has been allocate to be but not finished before DEFAULT_TASK_TIMEOUT_MINUTES, it will be discarded.
At the same time, the partitions belong to old task will be allocate to a new task. The new task with a signatrue will be add in the queue of needSchedulerRoutineLoadTask.
Step3: process all of needSchedulerRoutineLoadTask, allocate task to be. The task will be executed by backend.

RoutineLoadJob routineLoadJob = routineLoad.getJobByTaskId(routineLoadTaskInfo.getSignature());
RoutineLoadTask routineLoadTask = null;
if (routineLoadTaskInfo instanceof KafkaTaskInfo) {
routineLoadTask = new KafkaRoutineLoadTask(routineLoadJob.getResourceInfo(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better that adding a interface function to create LoadTask, like RoutineLoadJob.createTask(TaksInfo)

Copy link
Member

@wuyunfeng wuyunfeng Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better than? @imay

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better than? @imay

I think so, because this can encapsulate creating KafkaRoutineLoadTask to KafkaRoutineLoadJob.

If we add new type of job, we needn't to change code here, and just add another RoutineLoadJob and RoutineLoadTask

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😰

routineLoad.processTimeOutTasks();

// get idle be task num
int totalIdleTaskNum = routineLoad.getTotalIdleTaskNum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle task? task to be scheduled? Could you change a name?


// get idle be task num
int totalIdleTaskNum = routineLoad.getTotalIdleTaskNum();
int allocatedTaskNum = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runingTask? I think that allocate is used for some resource. And task is to be scheduled, not allocated?

private RoutineLoad routineLoad = Catalog.getInstance().getRoutineLoadInstance();

@Override
protected void runOneCycle() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest you put this function to another function like process, And this function call process and catch all Throwable to avoid this function throw a RuntimeError like NullpointerException


public RoutineLoadTaskInfo(long signature) {
this.signature = signature;
this.lock = new ReentrantReadWriteLock(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this simple class need a lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discuss with morningman I will use one lock between different RoutineLoadTaskInfo, because the time of locked can be ignore.

@@ -27,15 +28,18 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

public class RoutineLoad {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think RoutineLoadManager is better.

Copy link
Contributor Author

@EmmyMiao87 EmmyMiao87 Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Load not use LoadManager?

Copy link
Contributor Author

@EmmyMiao87 EmmyMiao87 Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think RoutineLoadManager is better too.

// diff beIds and beIdToMaxConcurrentTasks.keys()
List<Long> newBeIds = beIds.parallelStream().filter(entity -> beIdToMaxConcurrentTasks.get(entity) == null)
.collect(Collectors.toList());
List<Long> decommissionBeIds = beIdToMaxConcurrentTasks.keySet().parallelStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not name as 'decommissionBeIds', cause 'decommission' means the backend is being decommissioned.
So just name it as 'unavailableBeIds'

beIdToConcurrentTasks.remove(beId);
}
LOG.info("There are {} backends which participate in routine load scheduler. "
+ "There are {} new bes and {} decommission bes for routine load",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change 'decommission'

private Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask;
// KafkaPartitions means that partitions belong to one task
// kafka partitions == routine load task (logical)
private Queue<RoutineLoadTaskInfo> needSchedulerRoutineLoadTask;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needSchedulerRoutineLoadTask -> needSchedulerRoutineLoadTasks

runningTasks.removeAll(needSchedulerRoutineLoadTask);

for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) {
routineLoadTaskInfo.writeLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's highly recommended NOT to expose lock outside a class.
It will cause a lot of troubles.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I can't see why you need a lock here?
Nothing need to be protected in RoutineLoadTaskInfo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‘idToRoutineLoadTask’ is the member of RoutineLoad, it should not be protected by lock in RoutineLoadTaskInfo.

Copy link
Contributor Author

@EmmyMiao87 EmmyMiao87 Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In logical, every task only can process one function, either processTimeOutTask or commit task. So it really need a segment lock for idToRoutineLoadTask. According to the lot of time of commit task, I will use one lock instead of per task per lock.

idToRoutineLoadTask.put(kafkaTaskInfo.getSignature(), kafkaTaskInfo);
needSchedulerRoutineLoadTask.add(kafkaTaskInfo);
}
LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log without parameter?

for (Map.Entry<Long, Integer> entry : beIdToMaxConcurrentTasks.entrySet()) {
if (beIdToConcurrentTasks.get(entry.getKey()) == null) {
result = maxIdelTaskNum < entry.getValue() ? entry.getKey() : result;
maxIdelTaskNum = Math.max(maxIdelTaskNum, entry.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxIdelTaskNum: misspelling

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check all 'idel' misspelling, please...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will pay attention next time =_=

if (routineLoadTaskInfo != null) {
// when routine load task is not abandoned
if (routineLoad.getIdToRoutineLoadTask().get(routineLoadTaskInfo.getSignature()) != null) {
long beId = routineLoad.getMinTaskBeId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if routineLoad.getMinTaskBeId() return 0, which means no backend has available slots to work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not return 0, because the clusterIdleSlotNum is more then 0.

routineLoad.addNumOfConcurrentTasksByBeId(beId);
}
} else {
LOG.debug("Task {} for job already has been discarded", routineLoadTaskInfo.getSignature());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct grammar is: 'has already been' or 'has been already', not 'already has been'

// TODO(ml): init load task
kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH,
dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId()));
kafkaRoutineLoadTaskList.add(new KafkaTaskInfo(SystemIdGenerator.getNextId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemIdGenerator will produce repetitive id if Frontend restart or Master FE changed.
Just use a UUID or random Long is better, if you don't want to persist this info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe I can reuse CatalogIdGenerator, but CatalogIdGenerator need add a field named name.

@@ -45,7 +46,7 @@ protected void runOneCycle() {
LOG.debug("there are {} job need scheduler", routineLoadJobList.size());
for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
// judge nums of tasks more then max concurrent tasks of cluster
List<RoutineLoadTask> routineLoadTaskList = null;
List<RoutineLoadTaskInfo> routineLoadTaskList = null;
try {
routineLoadJob.writeLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use routineLoadJob.writeLock() to protect is weird.
And lock should be used as (lock() is outside the try{}):
lock();
try {

} finally {
unlock();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using routineLoadJob.writeLock() means that every process of different RoutineLoadJob will not be blocked.

@EmmyMiao87 EmmyMiao87 force-pushed the master branch 2 times, most recently from 382f4df to 38f90fd Compare November 21, 2018 14:04
@EmmyMiao87 EmmyMiao87 changed the title Add distributor which allocate task to be fairly Add distributor which scheduler task to be fairly Nov 21, 2018
@EmmyMiao87 EmmyMiao87 changed the title Add distributor which scheduler task to be fairly Add distributor which schedule task to be fairly Nov 21, 2018
Step1: updateBeIdTaskMaps, remove unalive be and add new alive be
Step2: process timeout tasks, if a task already has been allocate to be but not finished before DEFAULT_TASK_TIMEOUT_MINUTES, it will be discarded.
       At the same time, the partitions belong to old task will be allocate to a new task. The new task with a signatrue will be add in the queue of needSchedulerRoutineLoadTask.
Step3: process all of needSchedulerRoutineLoadTask, allocate task to be. The task will be executed by backend.
public Map<Long, RoutineLoadTask> getIdToRoutineLoadTask() {
return idToRoutineLoadTask;
public Map<Long, RoutineLoadTaskInfo> getIdToRoutineLoadTask() {
readLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock protects nothing...
After the caller gets 'idToRoutineLoadTask', it can do anything without lock protection.

}
}

public long getMinTaskBeId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It DOES return 0L in some cases.

}

public Queue<RoutineLoadTaskInfo> getNeedSchedulerRoutineLoadTasks() {
readLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, this lock protects nothing.

@@ -251,6 +360,34 @@ public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoad
}
}

public void processTimeOutTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout, not TimeOut


for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) {
if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs())
> DEFAULT_TASK_TIMEOUT_MINUTES * 60 * 1000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 min is too long? 10 sec I think?

@@ -45,38 +46,37 @@ protected void runOneCycle() {
LOG.debug("there are {} job need scheduler", routineLoadJobList.size());
for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
// judge nums of tasks more then max concurrent tasks of cluster
List<RoutineLoadTask> routineLoadTaskList = null;
List<RoutineLoadTaskInfo> routineLoadTaskList = null;
routineLoadJob.writeLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOT good

try {
process();
} catch (Throwable e) {
LOG.error("Failed to process one round of RoutineLoadTaskScheduler with error message {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warn level is appropriate


for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) {
if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs())
> DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove 60

} finally {
readUnlock();
switch (jobState) {
case NEED_SCHEDULER:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just:
stateJobs = idToRoutineLoadJob.values().stream()
.filter(entity -> entity.getState() == jobState)
.collect(Collectors.toList());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch case is unnecessary.

e.getMessage(), e);
}
}

private void process() {
private void process() throws LoadException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default interval of Daemon thread is 30 seconds, which means you have to wait at least 30 seconds to schedule next batch of tasks?
Maybe you need a trigger mechanism?

default:
break;
}
idToRoutineLoadJob.values().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You missed assigning 'stateJobs' variable....

@morningman morningman merged commit bbdf4fb into apache:master Nov 23, 2018
@morningman
Copy link
Contributor

#353

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants