Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gradient compression support #225

Merged
merged 386 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from 250 commits
Commits
Show all changes
386 commits
Select commit Hold shift + click to select a range
7079d5e
compression: fix server bug
jasperzhong Feb 19, 2020
6a5b6f2
compression: fix check
jasperzhong Feb 19, 2020
cc8a51a
compression: fix bug
jasperzhong Feb 19, 2020
ac86627
compression: fix a bug
jasperzhong Feb 19, 2020
55f84b4
compression: rm check
jasperzhong Feb 19, 2020
170b0fb
compression: fix bug
jasperzhong Feb 19, 2020
247b3e3
compression: fix decompress
jasperzhong Feb 19, 2020
1c4df37
compression: fix bug
jasperzhong Feb 19, 2020
4eebf26
compression: fix fatal bug
jasperzhong Feb 19, 2020
21fd9ef
compression: add log
jasperzhong Feb 19, 2020
ecfac06
compression: add log
jasperzhong Feb 20, 2020
af041b1
compression: fix fatal bug
jasperzhong Feb 20, 2020
9b2b545
compression: test
jasperzhong Feb 20, 2020
4c68f80
compression: rm logs
jasperzhong Feb 21, 2020
47b7302
compression: rename
jasperzhong Feb 21, 2020
303a0c8
compression: fix bug
jasperzhong Feb 21, 2020
3df0ec3
compression: fix typo
jasperzhong Feb 21, 2020
05eee8c
compression: refactor
jasperzhong Feb 23, 2020
8a6c947
compression: refactor
jasperzhong Feb 23, 2020
2df9a01
compression: fix typo
jasperzhong Feb 23, 2020
0025204
compression: fix bug
jasperzhong Feb 23, 2020
719915b
compression: fix bug
jasperzhong Feb 23, 2020
d24e035
compression: fix fatal bug
jasperzhong Feb 24, 2020
67bb376
compression: fix typo
jasperzhong Feb 24, 2020
3b9bf42
compression: fix test
jasperzhong Feb 24, 2020
90fe93d
compression: test add shutdown
jasperzhong Feb 24, 2020
4285ced
compression: add exit
jasperzhong Feb 24, 2020
df0abcf
compression: rm
jasperzhong Feb 24, 2020
7c4f086
compression: finish ef
jasperzhong Feb 24, 2020
fc259c7
compression: fix error
jasperzhong Feb 24, 2020
21ff784
compression: fix bug
jasperzhong Feb 24, 2020
c93bf95
compression: fix fatal bug
jasperzhong Feb 25, 2020
eca5893
compression: add args
jasperzhong Feb 25, 2020
d269496
compression: init zero
jasperzhong Feb 25, 2020
b9df722
compression: fix ef bug
jasperzhong Feb 25, 2020
f02d0a7
compression: use mean ef in server
jasperzhong Feb 25, 2020
00d462d
compression: pack should inplace
jasperzhong Feb 25, 2020
2b6c091
compression: add mean
jasperzhong Feb 26, 2020
003f02a
compression: rm ops.cc mean when enable ef
jasperzhong Feb 26, 2020
47aed64
compression: onebit add scale
jasperzhong Feb 26, 2020
06971ed
compression: fix typo
jasperzhong Feb 26, 2020
d03834d
compression: fix typo
jasperzhong Feb 26, 2020
08f6e26
compression: fix bug
jasperzhong Feb 26, 2020
4cef1ee
compression: add ef to ns
jasperzhong Feb 26, 2020
2a24707
compression: fix bug
jasperzhong Feb 26, 2020
a97fff0
compression: fix decompress with ef bug
jasperzhong Feb 26, 2020
101a0a6
compression: make scaled onebit optional
jasperzhong Feb 27, 2020
c3a6d1a
compression: add scale in script
jasperzhong Feb 27, 2020
7f0342a
compression: rm debuf
jasperzhong Feb 27, 2020
5354b63
compression: rm debuf
jasperzhong Feb 27, 2020
91eaf30
fix fatal bug
jasperzhong Mar 2, 2020
88b6027
compression: add const
jasperzhong Mar 2, 2020
a7b38c1
compression: add const
jasperzhong Mar 2, 2020
d857211
compression: support partition for worker
jasperzhong Mar 3, 2020
0a3c8d3
compression: add fp16 op support
jasperzhong Mar 3, 2020
80a77aa
compression: support for fp16
jasperzhong Mar 4, 2020
151a044
compression: fix missing
jasperzhong Mar 4, 2020
1bb51ec
compression: fix fp16 avx
jasperzhong Mar 4, 2020
23122e7
compression: fix typo
jasperzhong Mar 4, 2020
bc0c76f
compression: fix typo
jasperzhong Mar 4, 2020
414716e
compression: fix typo
jasperzhong Mar 4, 2020
c8d219c
compression: fix typo
jasperzhong Mar 4, 2020
0bbf3bc
compression: rm some warnings
jasperzhong Mar 4, 2020
1563b29
compression: fix bug
jasperzhong Mar 4, 2020
489662c
compression: rm a check
jasperzhong Mar 4, 2020
a028ad8
compression: fix align size bug
jasperzhong Mar 4, 2020
5f61e9c
compression: test fp16 fp64
jasperzhong Mar 4, 2020
f54f7fb
compression: fix test bug
jasperzhong Mar 4, 2020
dd3dd07
compression: add mxnet fp16 compression support
jasperzhong Mar 4, 2020
61bcb65
compression: add script fp args
jasperzhong Mar 4, 2020
60b24a3
compression: rename
jasperzhong Mar 4, 2020
e92854a
compression: fix fp16 type bug
jasperzhong Mar 4, 2020
9498d88
compression: fix intra-compression bug
jasperzhong Mar 4, 2020
3d43588
compression: remove wait_to_read
jasperzhong Mar 5, 2020
d2b3976
compression: fix bug
jasperzhong Mar 6, 2020
fd291e7
compression: fix compressed released bug
jasperzhong Mar 7, 2020
f4ee9f4
compression: shallow copy to prevent gc
jasperzhong Mar 7, 2020
608d609
compression: use shared_ptr
jasperzhong Mar 7, 2020
1a235fe
compression: add copy
jasperzhong Mar 7, 2020
16e4ed1
compression: fix bug
jasperzhong Mar 7, 2020
848b556
compression: fix typo
jasperzhong Mar 7, 2020
86da936
compression: fix bug
jasperzhong Mar 7, 2020
28dec70
compression: add gluon-cv imagenet script
jasperzhong Mar 9, 2020
c74770e
compression: add momentum support
jasperzhong Mar 10, 2020
b12deca
compression: update script
jasperzhong Mar 10, 2020
83fe21e
compression: disable server mom
jasperzhong Mar 10, 2020
96a8133
compression: fix bug
jasperzhong Mar 10, 2020
c7f17ad
compression: fix typo
jasperzhong Mar 10, 2020
8ef547d
compression: fix bug
jasperzhong Mar 11, 2020
d56c91d
compression: add mu
jasperzhong Mar 11, 2020
76dff7b
compression: fix fatal impl missing
jasperzhong Mar 11, 2020
22464c5
compression: update imagenet script
jasperzhong Mar 11, 2020
882778f
compression: fix c++0x compile error
jasperzhong Mar 11, 2020
b255357
compression: fix typo
jasperzhong Mar 11, 2020
b778c29
compression: fix imagenet trianing script
jasperzhong Mar 11, 2020
7147b58
compression: fix typo
jasperzhong Mar 11, 2020
9be30ce
compression: advance registration for server
jasperzhong Mar 12, 2020
3f5640e
compression: update register compressor
jasperzhong Mar 12, 2020
9f42e3f
compression: fix bug
jasperzhong Mar 12, 2020
1734e3b
compression: fix bug
jasperzhong Mar 16, 2020
2f25d03
compression: add register sync
jasperzhong Mar 16, 2020
594e580
compression: fix typo
jasperzhong Mar 16, 2020
0472fe9
compression: fix bug
jasperzhong Mar 16, 2020
cb65d21
compression: fix typo
jasperzhong Mar 16, 2020
ceff0ab
compression: fix bug
jasperzhong Mar 16, 2020
074c843
compression: add check
jasperzhong Mar 16, 2020
e9ee76c
compression: fix bug
jasperzhong Mar 16, 2020
efd2b74
compression: support async compression in server
jasperzhong Mar 17, 2020
f17bcdc
compression: fix bug
jasperzhong Mar 17, 2020
30b2273
compression: rm useless comments
jasperzhong Mar 17, 2020
613a06f
compression: disable small tensor
jasperzhong Mar 17, 2020
270b0bb
compression: update openmp
jasperzhong Mar 17, 2020
871503d
compression: fix compile bug
jasperzhong Mar 17, 2020
deda2f0
compression: fix bug
jasperzhong Mar 17, 2020
5da1d3a
compression: fix bug
jasperzhong Mar 18, 2020
f275227
compression: fix typo
jasperzhong Mar 18, 2020
8016349
compression: adjust omp thread num
jasperzhong Mar 18, 2020
440c822
compression: make min compress bound mutable
jasperzhong Mar 18, 2020
af649c3
compression: use max threads
jasperzhong Mar 18, 2020
8cffe9f
async: norm1 & rm scale
jasperzhong Mar 18, 2020
e5662c7
async: static_assert
jasperzhong Mar 18, 2020
396e3c0
async: error-feedback
jasperzhong Mar 18, 2020
5a66972
async: rm all async
jasperzhong Mar 18, 2020
236f1c2
async: enable norm1
jasperzhong Mar 18, 2020
a80818a
async: set omp threads
jasperzhong Mar 19, 2020
f29ffd0
compression: update script
jasperzhong Mar 19, 2020
2b0c8e2
async: compress in worker side
jasperzhong Mar 20, 2020
8b01a30
async: mv log
jasperzhong Mar 21, 2020
db95e53
async: fix multi-gpus bugs
jasperzhong Mar 21, 2020
0b7e136
async: fix typo
jasperzhong Mar 21, 2020
6683233
async: add two loops
jasperzhong Mar 22, 2020
5af389a
async: add func loops
jasperzhong Mar 23, 2020
490be97
speedup: rm std::async
jasperzhong Mar 23, 2020
bb41159
speedup: set omp thread_num 1
jasperzhong Mar 23, 2020
054d807
compression: update script
jasperzhong Mar 24, 2020
f31724d
compression: add thread_pool
jasperzhong Mar 24, 2020
71c699d
compression: update script
jasperzhong Mar 24, 2020
17da182
compression: set pool size = 8
jasperzhong Mar 24, 2020
cd6eb38
compression: fix typo
jasperzhong Mar 24, 2020
f14a605
compression: add cifar100 script
jasperzhong Mar 25, 2020
75b60a9
compression: update script
jasperzhong Mar 25, 2020
2fa725f
compression: update script
jasperzhong Mar 25, 2020
6bad2db
compression: update script
jasperzhong Mar 25, 2020
055dd35
compression: fix typo
jasperzhong Mar 25, 2020
51ab4ad
script: update
jasperzhong Apr 22, 2020
c239431
compression: update 1bit
jasperzhong Apr 28, 2020
c6245a1
compression: rm final
jasperzhong Apr 28, 2020
963a273
compression: fix typo
jasperzhong Apr 28, 2020
cb500cf
compression: fix typo
jasperzhong Apr 28, 2020
1338d0d
compression: update script
jasperzhong Apr 28, 2020
88fafaf
compression: update script
jasperzhong Apr 28, 2020
d0de4d6
compression: update script
jasperzhong Apr 29, 2020
1ab6b12
compression: add time and fix typo
jasperzhong Apr 29, 2020
beead76
compression: use mmap
jasperzhong Apr 29, 2020
90ab3b1
compression: update scripts
jasperzhong Apr 29, 2020
6ec283d
compression: fix script typo
jasperzhong Apr 29, 2020
688e36c
compression: fix mmap sigbus
jasperzhong Apr 29, 2020
e371b1f
comrpession: update header
jasperzhong Apr 29, 2020
9e98e84
compression: add check and errno
jasperzhong Apr 29, 2020
6d61bcd
compression: fix division zero
jasperzhong Apr 29, 2020
0202364
compression: release resourses
jasperzhong Apr 29, 2020
602e575
compression: use double lr
jasperzhong Apr 29, 2020
8940668
compression: update script (default opt is nag)
jasperzhong Apr 30, 2020
a6155d8
register: refactor
jasperzhong May 3, 2020
0a579e1
register: rm extra param
jasperzhong May 3, 2020
356cbea
register: debug
jasperzhong May 3, 2020
85edef9
register: fix typo
jasperzhong May 3, 2020
3706abe
register: update argument parser
jasperzhong May 3, 2020
776d5d7
compression: add weight decay momentum
jasperzhong Apr 30, 2020
9fdec63
1bit-wd: fix static_method bug
jasperzhong Apr 30, 2020
709e2cf
1bit-wd: fix typo
jasperzhong Apr 30, 2020
81a412a
1bit-wd: fix typo
jasperzhong Apr 30, 2020
c2b17bb
1bit-wd: fix bug
jasperzhong Apr 30, 2020
00b49e8
1bit-wd: init mom
jasperzhong Apr 30, 2020
748ce02
1bit-wd: use cache
jasperzhong Apr 30, 2020
d16ac29
debug
jasperzhong Apr 30, 2020
d58cd1e
1bit-wd: fix
jasperzhong Apr 30, 2020
110249f
1bit-wd: fix typo
jasperzhong Apr 30, 2020
065f2ca
1bit-wd: add mnist test
jasperzhong May 3, 2020
b01e52c
1bit-wd: refactor interface
jasperzhong May 3, 2020
239e8af
1bit-wd: update register
jasperzhong May 4, 2020
c84e722
1bit-wd: update script
jasperzhong May 4, 2020
38ad4a0
1bit-wd: add log
jasperzhong May 4, 2020
9ea0ba1
1bit-wd: fix fatal bug
jasperzhong May 4, 2020
4a5ea5f
1bit-wd: rm logs
jasperzhong May 4, 2020
bcff3da
1bit-wd: fix typo
jasperzhong May 4, 2020
258b476
1bit-wd: fix ref
jasperzhong May 4, 2020
23fdcdd
1bit-wd: fix copy bug
jasperzhong May 4, 2020
2fb7a80
1bit-wd: fix typo
jasperzhong May 4, 2020
1de9174
1bit-wd: fix typo in script
jasperzhong May 4, 2020
b841f7b
1bit-wd: fix list out of range bug
jasperzhong May 4, 2020
7a0bea6
compression: async wd momentum (#6)
jasperzhong May 9, 2020
b261e4b
hotfix: use default num_threads (#7)
jasperzhong May 9, 2020
f6fd608
hotfix: use concurrent.futures.Timeout (#8)
jasperzhong May 10, 2020
1e40fd9
compression: speed up wd momentum with threading.Thread (#9)
jasperzhong May 12, 2020
2379e17
compression: add topk compressor (#10)
jasperzhong May 14, 2020
720d7e9
compression: add randomk compressor (#11)
jasperzhong May 14, 2020
d771932
numa: finetune support (#12)
jasperzhong May 14, 2020
afb96c8
rename: more readable (#13)
jasperzhong May 16, 2020
3712c33
hotfix: update non-compression case register error (#14)
jasperzhong May 16, 2020
f881fb4
compression: allreduce results for training scripts (#16)
jasperzhong May 25, 2020
3844228
hotfix: file mode use append (#17)
jasperzhong May 26, 2020
c17bfb6
compression: optimize implementation of compressors (#18)
jasperzhong May 30, 2020
7f12e15
cifar: update (#19)
jasperzhong May 30, 2020
6c45c79
hotfix: fix bugs (#20)
jasperzhong Jun 1, 2020
f7d9969
hotfix: fix fatal bug of new 1bit (#21)
jasperzhong Jun 10, 2020
ae30478
1bit: use double for scaling (#22)
jasperzhong Jun 17, 2020
c507e14
1bit: update wd mom (#23)
jasperzhong Jun 20, 2020
2a98d12
refactor: format and rename (#24)
jasperzhong Jun 24, 2020
d063a38
test: add compressor test cases (#25)
jasperzhong Jun 25, 2020
14a5bc0
misc: add comments (#26)
jasperzhong Jun 26, 2020
3347570
1bit: update FastUpdateErrorImpl (#27)
jasperzhong Jun 27, 2020
e07d7ec
random: swtich to xorshift128p rng backend (#28)
jasperzhong Jun 29, 2020
bc2ab6d
misc: use macros (#29)
jasperzhong Jun 29, 2020
6c44049
1bit: more parallelism (#31)
jasperzhong Jul 1, 2020
3952f43
Dithering (#33)
jasperzhong Jul 7, 2020
79f35b1
dithering: add ef support (#34)
jasperzhong Jul 7, 2020
478c50b
update to lastest ps-lite
jasperzhong Jul 8, 2020
90150a4
add docs (#35)
jasperzhong Jul 9, 2020
d08bdc6
Update byteps/common/core_loops.cc
jasperzhong Jul 10, 2020
39391d2
Update byteps/common/core_loops.cc
jasperzhong Jul 10, 2020
4522c52
dithering: optimize (#43)
jasperzhong Jul 19, 2020
a201d52
misc: fix typos
jasperzhong Jul 19, 2020
7da0596
Update byteps/common/thread_pool.h
jasperzhong Jul 21, 2020
1f190ed
misc: remove unused code & fix some warnings & keep non-alpha sum (#44)
jasperzhong Jul 21, 2020
ac1e9a3
dithering: add max normalization support (#45)
jasperzhong Jul 22, 2020
b4ecc6a
reduce: add num threads recommend (#46)
jasperzhong Jul 22, 2020
c132186
fp16: add fp16 support (#47)
jasperzhong Jul 24, 2020
cb221df
hotfix: fix register bugs of 1bit (#48)
jasperzhong Jul 25, 2020
23e38eb
hotfix: fix server SIGSEGV when shutdown (#50)
jasperzhong Jul 28, 2020
c28f84c
topk: fix fatal bugs when k > 1 (#51)
jasperzhong Jul 29, 2020
3068e31
dithering: fix natural dithering bug (#52)
jasperzhong Jul 29, 2020
8b1218d
sparsification: support factor k (#55)
jasperzhong Jul 30, 2020
ced6f2b
hotfix: fix typo (#56)
jasperzhong Jul 30, 2020
fa7821b
hotfix: add log (#57)
jasperzhong Jul 30, 2020
378cc2f
exp: update cifar100 (#58)
jasperzhong Jul 30, 2020
7dc8d7f
misc: remove recommend omp threads (#59)
jasperzhong Jul 30, 2020
6673b7d
1bit: not need to do wd mom for uncompressed gradients (#61)
jasperzhong Jul 30, 2020
a692fea
hotfix: fix distributed initialization #285
ZiyueHuang Aug 5, 2020
ba60a76
hotfix: merge two buffer (mentioned in #285)
jasperzhong Aug 5, 2020
c6464a9
Revert "hotfix: fix distributed initialization #285"
jasperzhong Aug 6, 2020
40114f3
hotfix: remove unnecessary code
jasperzhong Aug 6, 2020
7875987
test: full test coverage (#53)
jasperzhong Aug 12, 2020
63bbb58
Merge branch 'master' into gradient_compression
jasperzhong Aug 13, 2020
e857315
misc: refactor wdmom
jasperzhong Aug 13, 2020
ec99e82
1bit: use wd_mult
jasperzhong Aug 13, 2020
dfcc670
1bit: update wd mom
jasperzhong Aug 13, 2020
774f49c
mom: nag for uncompressed gradients (#62)
jasperzhong Aug 13, 2020
f894478
hotfix: fix wd mom issue (#63)
jasperzhong Aug 13, 2020
ef6f916
hotfix: update
jasperzhong Aug 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,4 @@ venv.bak/

# for development
scripts/
exps/
2 changes: 2 additions & 0 deletions byteps/common/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ int GetCommandType(RequestType requestType, int d) {
return (((m + d) * (m + d + 1)) / 2) + d;
}

#ifndef BYTEPS_BUILDING_SERVER
ncclDataType_t getNcclDataType(DataType dtype) {
switch (dtype) {
case BYTEPS_FLOAT32:
Expand All @@ -121,6 +122,7 @@ ncclDataType_t getNcclDataType(DataType dtype) {
}
return ncclFloat32;
}
#endif

int getDataTypeLength(int dtype) {
switch (dtype) {
Expand Down
50 changes: 41 additions & 9 deletions byteps/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,23 @@
#include <vector>

// Add for profiling communication events
#include <fstream>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <thread>

#include <chrono>
#include <fstream>
#include <iostream>
#include <queue>
#include <thread>

namespace byteps {
namespace common {
namespace compressor {
struct BPSTensor;
typedef BPSTensor tensor_t;
class Compressor;
class ErrorFeedback;
} // namespace compressor

// Device ID used for CPU.
#define CPU_DEVICE_ID (-1)
Expand Down Expand Up @@ -83,8 +90,10 @@ enum QueueType {
COPYD2H,
PCIE_REDUCE,
COORDINATE_PUSH,
COMPRESS,
PUSH,
PULL,
DECOMPRESS,
COPYH2D,
COORDINATE_BROADCAST,
BROADCAST,
Expand All @@ -94,10 +103,18 @@ enum QueueType {
const int QueueNum =
(int)QUEUE_NUM_AND_NOT_A_REAL_QUEUE_TYPE_AND_MUST_BE_THE_LAST;

const std::vector<std::string> LogStrings = {
"COORDINATE_REDUCE", "REDUCE", "COPYD2H", "PCIE_REDUCE",
"COORDINATE_PUSH", "PUSH", "PULL", "COPYH2D",
"COORDINATE_BROADCAST", "BROADCAST"};
const std::vector<std::string> LogStrings = {"COORDINATE_REDUCE",
"REDUCE",
"COPYD2H",
"PCIE_REDUCE",
"COORDINATE_PUSH",
"COMPRESS",
"PUSH",
"PULL",
"DECOMPRESS",
"COPYH2D",
"COORDINATE_BROADCAST",
"BROADCAST"};

class Status {
public:
Expand Down Expand Up @@ -173,11 +190,17 @@ typedef struct BytePSContext {
std::vector<void*> pcie_cpubuff;
size_t buff_len;
// Used for profiling communication events
std::queue<BPSCommTime *> comm_time;
std::queue<BPSCommTime*> comm_time;
bool profile_flag = false;
int step_cnt = 0;
int local_rank = 0;
std::unordered_map<uint64_t, std::unordered_map<int, std::queue<BPSCommTime *>>> part_comm_time;
std::unordered_map<uint64_t,
std::unordered_map<int, std::queue<BPSCommTime*>>>
part_comm_time;
// Compressor list
std::vector<std::shared_ptr<compressor::Compressor>> compressor_list;
// kwargs
std::unordered_map<std::string, std::string> kwargs;
} BPSContext;

class Tensor {
Expand Down Expand Up @@ -233,6 +256,10 @@ struct TensorTableEntry {
std::shared_ptr<std::atomic_int> counter_ptr;
// How many partitions
unsigned int total_partnum = 0;
// Compressor
std::shared_ptr<compressor::Compressor> compressor;
// Compressed
std::shared_ptr<compressor::tensor_t> compressed;
};
using TensorTable = std::unordered_map<std::string, TensorTableEntry>;

Expand All @@ -250,6 +277,11 @@ ncclDataType_t getNcclDataType(DataType dtype);

int getDataTypeLength(int dtype);

inline size_t Align(size_t size, int dtype) {
const size_t min_size =
(getDataTypeLength(dtype) * getDataTypeLength(dtype)) * 8;
jasperzhong marked this conversation as resolved.
Show resolved Hide resolved
return size + (min_size - size % min_size) % min_size;
}
} // namespace common
} // namespace byteps

Expand Down
84 changes: 84 additions & 0 deletions byteps/common/compressor/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_COMMON_H
#define BYTEPS_COMPRESSOR_COMMON_H

#include <unordered_map>

namespace byteps {
namespace common {
namespace compressor {
typedef char byte_t;
/*!
* \brief Tensor type
*/
typedef struct BPSTensor {
byte_t* data;
size_t size;
int dtype;

BPSTensor() : data(nullptr), size(0), dtype(0) {}
BPSTensor(void* data, size_t size = 0, int dtype = 0)
: data(reinterpret_cast<byte_t*>(data)), size(size), dtype(dtype) {}
} tensor_t;

using kwargs_t = std::unordered_map<std::string, std::string>;

#define COMPRESS_IMPL_SWITCH(dtype, func, dst, src, size) \
switch (dtype) { \
case BYTEPS_FLOAT32: \
return func(reinterpret_cast<uint32_t*>(dst), \
reinterpret_cast<const float*>(src), size / sizeof(float)); \
case BYTEPS_FLOAT64: \
return func(reinterpret_cast<uint64_t*>(dst), \
reinterpret_cast<const double*>(src), \
size / sizeof(double)); \
default: \
BPS_CHECK(0) << "Unsupported data type:" << dtype; \
}

#define DECOMPRESS_IMPL_SWITCH(dtype, func, dst, src, compressed_size) \
switch (dtype) { \
case BYTEPS_FLOAT32: \
return func(reinterpret_cast<float*>(dst), \
reinterpret_cast<const uint32_t*>(src), compressed_size); \
case BYTEPS_FLOAT64: \
return func(reinterpret_cast<double*>(dst), \
reinterpret_cast<const uint64_t*>(src), compressed_size); \
default: \
BPS_CHECK(0) << "Unsupported data type:" << dtype; \
}

#define SWITCH_TO_FAST_UPDATE_ERROR_IMPL_SWITCH(dtype, func, dst, src1, src2, \
jasperzhong marked this conversation as resolved.
Show resolved Hide resolved
compressed_size) \
switch (dtype) { \
case BYTEPS_FLOAT32: \
return func(reinterpret_cast<float*>(dst), \
reinterpret_cast<float*>(src1), \
reinterpret_cast<const uint32_t*>(src2), compressed_size); \
case BYTEPS_FLOAT64: \
return func(reinterpret_cast<double*>(dst), \
reinterpret_cast<double*>(src1), \
reinterpret_cast<const uint64_t*>(src2), compressed_size); \
default: \
BPS_CHECK(0) << "Unsupported data type:" << dtype; \
}

} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_COMMON_H
137 changes: 137 additions & 0 deletions byteps/common/compressor/compressor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_COMPRESSOR_H
#define BYTEPS_COMPRESSOR_COMPRESSOR_H

#include <memory>

#include "../cpu_reducer.h"
#include "common.h"

namespace byteps {
namespace common {
namespace compressor {
/*!
* \brief Compressor interface
* Compressor defines two universal API - Compress & Decompress
*
* \par
* The caller do not need to allocate additional memory to store compressed data
* because there is an internal buffer to store the compressed data and the
* pointer will be returned to the caller. Then the caller can send the returned
* compressed data as normal.
*
* \par
* There are two optional features of the compressor - error-feedback &
* momentum. These two features can be added to any common compressors like 1bit
* and topk. To be generic, these two features are also compressors, exposing
* the same API as Compressor. More details can be found in their own files.
*
* \par
* To add a new compressor, developers need to inherit this class in 'impl'
* directory. If a new optional feature like error-feedback is needed,
* developers need to use decorator pattern and add new files in the current
* directory. The existing implementation can be used as a reference.
*
*
* \sa ErrorFeedback, Momentum
*/
class Compressor {
public:
Compressor(size_t size, DataType dtype)
: _size(size),
_dtype(dtype),
_buf(new byte_t[size]),
_cpu_reducer(new CpuReducer(nullptr)){};
virtual ~Compressor() = default;

/*!
* \brief Compress function
*
* \note Except for error-feedback and momentum, the underlying data of input
* should never be changed. this is because input is still used in error
* feedback if enabled.
*
* \note Compressed data should be stored in the buffer of the compressor. So
* it is not an inplace operation.
*
* \param grad gradient tensor, passed by value.
* \return compressed tensor. it is the buffer of the compressor,
* which contains the compressed data. the returned size is the size of
* compressed data.
*/
virtual tensor_t Compress(tensor_t grad) = 0;

/*!
* \brief Decompress function
*
* \note For servers, decompression is not an inplace operation. The
* decompressed results locates in the buffer of the compressor. For workers,
* it is an inplace operation.
*
* \param compressed compressed tensor.
* \return decompressed tensor. For servers, it is the buffer of the
* compressor, which contains the decompressed data. For workers, its pointer
* is the same as the input's, while the size is decompressed size, which is
* also the original size.
*/
virtual tensor_t Decompress(tensor_t compressed) = 0;

/*!
* \brief faster version of `UpdateError` via operation fusion
*
* \par
* This is a helper function implemented by each compressor. If defined,
* `ErrorFeedback` will use this function instead of defualt `UpdateError`
* function implemented in error_feedback.cc. If undefined, default
* `UpdateError` will be used.
*
* \par
* Typically `UpdateError` needs to decompress and do a substraction. But for
* most compressors, the step of decompression can be avoided. For example,
* for topk compressor, `UpdateError` can be simplied in this way:
* 1. e <- p (e is the error and p is the corrected gradient)
* 2. zero-fill e with selected k indices
*
* Actually it is a fusion of original decompression and substraction. It is
* optional to override.
*
* \param corrected gradient corrected with error
* \param error error
* \param compressed compressed gradient
*/
virtual void FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) {
BPS_LOG(FATAL) << "FastUpdateError is not implemented";
};

/*! \brief buffer to store compressed grad */
std::unique_ptr<byte_t[]> _buf;

/*! \brief original size */
size_t _size;

DataType _dtype;

/*! \brief CPU reducer */
std::unique_ptr<CpuReducer> _cpu_reducer;
};

} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_COMPRESSOR_H
Loading