Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design document of RemoteExecutor. #7720

Closed
wants to merge 19 commits into from
121 changes: 121 additions & 0 deletions doc/design/dist_refactor/remote_executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Design Doc: RemoteExecutor
## Abstract
We propose some details to implement `RemoteExecutor`:

- Each `Partitioned IR(intermediate representation)` has unique `IRID`.
- Each `Partitioned IR` with its relationship with others and its resource need are stored into etcd.
Copy link
Contributor

@helinwang helinwang Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "others"? Maybe replace others with what others really are here.

- Each `PaddlePaddle Runtime` runs `Partitioned IR` got from etcd by `IRID`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes our runtime depend on etcd. I think the dependency direction is wrong. If we really want to use etcd as a communication method, it should be our runtime define some communication interface, and implement an etcd adapter class that implements the interface. So that the framework does not depend on etcd, instead, only the adapter depends on etcd. This is called Dependency inversion principle. (in my opinion, a major benefit of using an OOP programming language).

Anway, is etcd necessary, why communicate with etcd is better than communicate with rpc? As described in the PR, it's for "So the executed Partitioned IR can communicate with each other by IRID even if some of them restart.". However, wouldn't fault tolerance not related to communication method? We can still communicate using rpc, when we want to support fault tolerant, the runtime can store state in disk/etcd.

Copy link
Contributor Author

@gongweibao gongweibao Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anway, is etcd necessary, why communicate with etcd is better than communicate with rpc

The partition IR communicate to other IR with RPC.When paddle RunTime starts, it gets IR from etcd(storage) and cached it to local.The local cache should be updated when send/get meets error(the remote may not exists).

- So the executed `Partitioned IR` can communicate with each other by `IRID` even if some of them restart.

## Architect graph
<div style="align: center">
<img src="src/remote_executor2.png" width="700" align=center/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense for the transpiler running in the cloud, because the transpiler should know the resource (e.g., how many trainers running) information (e.g., with autoscaling, the number of trainers running changes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the controller should tell how remote executor how many trainers currently running, not the other way around. In this way, we can run remote executor on bare medals, where there is no controller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: draw a graph that specify how this system runs on bare medal cloud (e.g., no k8s controller, no etcd), and another graph specifying how we integrate k8s cloud with it. In the current graph, remote executor can not run without etcd or controller.

Copy link
Contributor

@helinwang helinwang Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really think that the controller (Paddle job) should be controlled by another Python API:

screen shot 2018-01-22 at 11 50 02 am

Copy link
Contributor Author

@gongweibao gongweibao Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question:
Where can RemoteExecutor save state(IR)? On distribution system?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks to me that there is one global gatekeeper (RemoteExecutor) to orchestrate all the roles and traffic in and out of the cluster? I think this kind of remote executor should be one per job?

</div>

### Job's type
- Foreground Job: when the client exits the jobs will be killed.
- It's convenient for the users to debug their program.
- It needs a `HeartBeat` to `RemoteExecutor` to report that client is living.Otherwise, the `RemoteExecutor` will kill the job.
- Background Job: client's death doesn't affect the job.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just need background job, all jobs should be killed explicitly by the paddle cloud management Python API.


### PaddleRunTime
There is no fundamental difference between the `Trainer` and the `Parameter server`, they are all `PaddleRunTime`.They do the different tasks just because they execute different `ProgramDesc`.
Although we reserve `Trainer` and `Pserver` concepts in the pseudo codes below, it's just for users to distinguish among different `ProgramDesc`s.They are just names for `ProgramDesc`s.

## Peudo code of users
Copy link
Contributor

@helinwang helinwang Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should aim for the user only have to change the executor to a remote executor in Python code, and nothing else. This peudo code requires the user to change too many lines.
I know currently we need the user to change many lines, but I think we should aim for only changing few lines.

```
import RemoteExecutor as remotor
...
# get IRs and their relationship.
# the trainers iteration is implemented by fluid op.
trainer_IR,pserver_IR = fluid.dist_transpiler(tainers_num=,tainer_pass_num=,pserves_num=,mode=sync)

job_name = "test_1"

# you can kill a job first.
#if remotor.find(job_name):
# remotor.kill(job_name)

# start etcd and keep heart-beat if need.
job, err = remotor.init_job(job_name, type=foreground)
if err is not null:
print "start job:", job_name, " errors:", err
sys.exit(1)

# store IRs with resource need to etcd.
trainers = job.add_workers(trainer_IR,cpu=,gpu=,mem=)
pservers = job.add_workers(pserver_IR,cpu=,gpu=,mem=)

# start trainers and pserver pods.
# pod info will be stored to etcd after pod start.
pservers.start()
trainers.start()

# get results from trainers or others.
while True:
accs = trainers.get(acc)
for c in acc:
print(" acc:" + str(c))
# you can break according accs.

jobs.stop()
```


## Storage
Copy link
Contributor

@helinwang helinwang Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are mixing fault tolerance with communication here (as mentioned in #7720 (comment)). In my opinion we should separate them, because they two are very different, and have different libraries / tools that fit best.

Another point is if we store all the IR communication in etcd, who will delete IRs that are no longer necessary? I my opinion, we use the best library for communication, don't save the message. And the best library for fault tolerance, and override the necessary states.

- etcd is a key-value storage, but we can convert a table to key-value style easily by use combination key.
- We store info in multiple tables because some of them may be changed more frequently than others.

### Table: IR_Resource

| column name | description|
|----------|-------------|
| IRID | unique ID of IR |
| IR| `ProgramDesc` |
| send_var_IR_map|map of variables and IRIDs which will be sent |
| get_var_IR_map|map of variables and IRIDs which will be got|
|resource|resource needed by this IR|

### Table: IR_Pod
Table maybe changed since some pods maybe dead.

| column name | description|
|----------|-------------|
|IRID|unique ID of IR|
|pod_name|pod name which executes IR|
|pod_ip|pod ip|
|pod_port|pod port|

### Table: IR_CheckPoints
| column name | description|
|----------|-------------|
|IRID|unique ID of IR|
|checkpoint|last checkpoint of this IR|


## Fault-tolerant
IR can communicate with others correctly by IRID.

- When `send/get` operation meets error or timeout, the executor should get new IR's relationship from etcd and retry again.

## Notice
It's easier to implement distribution after we implement `fluid` as a new language.All functions are implemented in `ProgramDesc`.So the `dist_transpiler` needs to be upgraded to support add the operators to include what we implement in Python level before.
For example, a `Trainer` needs
```
Init from startup-program-desc or init from Parameter Server(for fault-tolerant mode).# executed in python before.
run iteration loop # loop action is executed in python before.
save results where they can be fetched. # in Python codes, it's maybe implemented as `print`.
```

A `Parameter Server` needs
```
Init from startup-program-desc or init from checkpoint(for fault-tolerant mode). # executed in python before.
Wait to receive gradients to update parameters.
Wait for trainers to get updated parameters.
```

## Autoscaling
TODO

## Reference
[Design Doc: Distributed Training Architecture](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/dist_refactor/distributed_architecture.md)
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.