diff --git a/faster_tokenizer/faster_tokenizer/core/encoding.cc b/faster_tokenizer/faster_tokenizer/core/encoding.cc index c3a7fcbfd50b..980e192abcbc 100644 --- a/faster_tokenizer/faster_tokenizer/core/encoding.cc +++ b/faster_tokenizer/faster_tokenizer/core/encoding.cc @@ -22,6 +22,7 @@ limitations under the License. */ #ifdef WITH_OMP #include #endif + namespace paddlenlp { namespace faster_tokenizer { namespace core { @@ -600,6 +601,23 @@ bool TruncateEncodings(Encoding* encoding, return true; } +void MultiThreadPadEncodings(std::vector* encodings, + const PadMethod& method, + size_t pad_length, + size_t start_index, + size_t step_index) { + auto batch_size = encodings->size(); + size_t end_index = start_index + step_index; + if (end_index > batch_size) end_index = batch_size; + for (size_t i = start_index; i < end_index; ++i) { + auto& encoding = (*encodings)[i]; + encoding.Pad(pad_length, + method.pad_id_, + method.pad_token_type_id_, + method.pad_token_, + method.direction_); + } +} void PadEncodings(std::vector* encodings, const PadMethod& method) { if (encodings == nullptr || encodings->empty()) { return; @@ -619,7 +637,6 @@ void PadEncodings(std::vector* encodings, const PadMethod& method) { auto batch_size = encodings->size(); #ifdef WITH_OMP #pragma omp parallel for if (batch_size >= 4 && omp_get_max_threads() > 1) -#endif for (int i = 0; i < batch_size; ++i) { auto& encoding = (*encodings)[i]; encoding.Pad(pad_length, @@ -628,6 +645,51 @@ void PadEncodings(std::vector* encodings, const PadMethod& method) { method.pad_token_, method.direction_); } +#else + auto func = std::bind(&MultiThreadPadEncodings, + encodings, + std::ref(method), + pad_length, + std::placeholders::_1, + std::placeholders::_2); + RunMultiThread(func, batch_size); +#endif +} + +int GetThreadNum(size_t batch_size) { + char* env_var = std::getenv("OMP_NUM_THREADS"); + int thread_num = std::atoi(env_var); + if (batch_size <= 0) { + thread_num = 1; + VLOG(3) << "batch_size <=0, we set OMP_NUM_THREADS = 1"; + } else { + int best_num = ceil(batch_size / 4.0); + if (thread_num > best_num) { + thread_num = best_num; + VLOG(3) << "OMP_NUM_THREADS > batch_size/4, we set OMP_NUM_THREADS = " + "batch_size/4"; + } else if (thread_num == 0) { + thread_num = best_num; + VLOG(3) << "OMP_NUM_THREADS == 0, we set OMP_NUM_THREADS = batch_size/4"; + } + } + return thread_num; +} + +void RunMultiThread(std::function func, + size_t batch_size) { + int thread_num = GetThreadNum(batch_size); + std::vector vectorOfThread; + size_t start_index = 0; + size_t step_index = ceil(batch_size / float(thread_num)); + + for (size_t thread_index = 0; thread_index < thread_num; thread_index++) { + vectorOfThread.emplace_back(std::thread(func, start_index, step_index)); + start_index = start_index + step_index; + } + for (size_t thread_index = 0; thread_index < thread_num; thread_index++) { + vectorOfThread[thread_index].join(); + } } } // namespace core diff --git a/faster_tokenizer/faster_tokenizer/core/encoding.h b/faster_tokenizer/faster_tokenizer/core/encoding.h index 34f5a93bdec2..12a4bb708635 100644 --- a/faster_tokenizer/faster_tokenizer/core/encoding.h +++ b/faster_tokenizer/faster_tokenizer/core/encoding.h @@ -21,6 +21,12 @@ limitations under the License. */ #include "faster_tokenizer/core/base.h" #include "faster_tokenizer/utils/utils.h" +#include +#include +#include +#include +using namespace std; + namespace paddlenlp { namespace faster_tokenizer { namespace core { @@ -122,6 +128,10 @@ bool FASTERTOKENIZER_DECL TruncateEncodings(Encoding* encoding, void FASTERTOKENIZER_DECL PadEncodings(std::vector* encoding, const PadMethod& method); +int FASTERTOKENIZER_DECL GetThreadNum(size_t batch_size); + +void FASTERTOKENIZER_DECL +RunMultiThread(std::function func, size_t batch_size); } // namespace core } // namespace faster_tokenizer } // namespace paddlenlp diff --git a/faster_tokenizer/faster_tokenizer/core/tokenizer.cc b/faster_tokenizer/faster_tokenizer/core/tokenizer.cc index 626910584486..1b6399c4aedf 100644 --- a/faster_tokenizer/faster_tokenizer/core/tokenizer.cc +++ b/faster_tokenizer/faster_tokenizer/core/tokenizer.cc @@ -26,6 +26,7 @@ limitations under the License. */ #include "faster_tokenizer/postprocessors/postprocessors.h" #include "faster_tokenizer/pretokenizers/pretokenizers.h" + #ifdef WITH_OMP #include #endif @@ -248,23 +249,49 @@ void Tokenizer::EncodePairStrings(const EncodeInput& encode_input, } } +void Tokenizer::MultiThreadEncodeBatchStrings( + const std::vector& batch_encode_input, + std::vector* encodings, + bool add_special_tokens, + size_t start_index, + size_t step_index) const { + auto batch_size = batch_encode_input.size(); + size_t end_index = start_index + step_index; + if (end_index > batch_size) end_index = batch_size; + for (size_t i = start_index; i < end_index; ++i) { + EncodePairStrings( + batch_encode_input[i], &(*encodings)[i], add_special_tokens); + } +} + void Tokenizer::EncodeBatchStrings( const std::vector& batch_encode_input, std::vector* encodings, bool add_special_tokens) const { auto batch_size = batch_encode_input.size(); encodings->resize(batch_size); + #ifdef WITH_OMP // (TODO:zhoushunjie): Simply use the batch size to estimate the workload of // tokenization. // Use workload to determine whether create omp threads. Need to optimize the // workload estimation. #pragma omp parallel for if (batch_size >= 4 && omp_get_max_threads() > 1) -#endif for (int i = 0; i < batch_size; ++i) { EncodePairStrings( batch_encode_input[i], &(*encodings)[i], add_special_tokens); } +#else + auto func = std::bind(&Tokenizer::MultiThreadEncodeBatchStrings, + this, + std::ref(batch_encode_input), + encodings, + add_special_tokens, + std::placeholders::_1, + std::placeholders::_2); + RunMultiThread(func, batch_size); +#endif + if (use_padding_) { PadEncodings(encodings, pad_method_); } @@ -289,6 +316,23 @@ void Tokenizer::EncodePairStringsCharOffsets(const EncodeInput& encode_input, PostProcess(&encoding, &pair_encoding, add_special_tokens, encodings); } +void Tokenizer::MultiThreadEncodeBatchStringsCharOffsets( + const std::vector& batch_encode_input, + std::vector* encodings, + bool add_special_tokens, + size_t start_index, + size_t step_index) const { + auto batch_size = batch_encode_input.size(); + size_t end_index = start_index + step_index; + if (end_index > batch_size) end_index = batch_size; + for (size_t i = start_index; i < end_index; ++i) { + Encoding encoding; + EncodePairStringsCharOffsets( + batch_encode_input[i], &encoding, add_special_tokens); + (*encodings)[i] = std::move(encoding); + } +} + void Tokenizer::EncodeBatchStringsCharOffsets( const std::vector& batch_encode_input, std::vector* encodings, @@ -301,13 +345,23 @@ void Tokenizer::EncodeBatchStringsCharOffsets( // Use workload to determine whether create omp threads. Need to optimize the // workload estimation. #pragma omp parallel for if (batch_size >= 4 && omp_get_max_threads() > 1) -#endif for (int i = 0; i < batch_size; ++i) { Encoding encoding; EncodePairStringsCharOffsets( batch_encode_input[i], &encoding, add_special_tokens); (*encodings)[i] = std::move(encoding); } +#else + auto func = std::bind(&Tokenizer::MultiThreadEncodeBatchStringsCharOffsets, + this, + std::ref(batch_encode_input), + encodings, + add_special_tokens, + std::placeholders::_1, + std::placeholders::_2); + RunMultiThread(func, batch_size); +#endif + if (use_padding_) { PadEncodings(encodings, pad_method_); } @@ -404,11 +458,27 @@ void Tokenizer::Decode(const std::vector& token_ids, } } + +void Tokenizer::MultiThreadDecodeBatch( + const std::vector>& batch_token_ids, + std::vector* results, + bool skip_special_tokens, + size_t start_index, + size_t step_index) const { + auto batch_size = batch_token_ids.size(); + size_t end_index = start_index + step_index; + if (end_index > batch_size) end_index = batch_size; + for (size_t i = start_index; i < end_index; ++i) { + Decode(batch_token_ids[i], &(*results)[i], skip_special_tokens); + } +} + void Tokenizer::DecodeBatch( const std::vector>& batch_token_ids, std::vector* results, bool skip_special_tokens) const { - results->resize(batch_token_ids.size()); + auto batch_size = batch_token_ids.size(); + results->resize(batch_size); #ifdef WITH_OMP // (TODO:zhoushunjie): Simply use the batch size to estimate the workload of // tokenization. @@ -416,10 +486,19 @@ void Tokenizer::DecodeBatch( // workload estimation. #pragma omp parallel for if (batch_token_ids.size() >= 4 && \ omp_get_num_threads() > 1) -#endif for (int i = 0; i < batch_token_ids.size(); ++i) { Decode(batch_token_ids[i], &(*results)[i], skip_special_tokens); } +#else + auto func = std::bind(&Tokenizer::MultiThreadDecodeBatch, + this, + std::ref(batch_token_ids), + results, + skip_special_tokens, + std::placeholders::_1, + std::placeholders::_2); + RunMultiThread(func, batch_size); +#endif } bool Tokenizer::GetUseTruncation() const { return use_truncation_; } diff --git a/faster_tokenizer/faster_tokenizer/core/tokenizer.h b/faster_tokenizer/faster_tokenizer/core/tokenizer.h index bf317efe1b98..d709cc5a5c6e 100644 --- a/faster_tokenizer/faster_tokenizer/core/tokenizer.h +++ b/faster_tokenizer/faster_tokenizer/core/tokenizer.h @@ -160,10 +160,24 @@ class FASTERTOKENIZER_DECL Tokenizer { bool add_special_tokens, Encoding* result_encoding) const; + void MultiThreadEncodeBatchStrings( + const std::vector& batch_encode_input, + std::vector* encodings, + bool add_special_tokens, + size_t start_index, + size_t step_index) const; + void EncodeBatchStrings(const std::vector& batch_encode_input, std::vector* encodings, bool add_special_tokens = true) const; + void MultiThreadEncodeBatchStringsCharOffsets( + const std::vector& batch_encode_input, + std::vector* encodings, + bool add_special_tokens, + size_t start_index, + size_t step_index) const; + void EncodeBatchStringsCharOffsets( const std::vector& batch_encode_input, std::vector* encodings, @@ -194,6 +208,12 @@ class FASTERTOKENIZER_DECL Tokenizer { void Decode(const std::vector& token_ids, std::string* result, bool skip_special_tokens = true) const; + void MultiThreadDecodeBatch( + const std::vector>& batch_token_ids, + std::vector* results, + bool skip_special_tokens, + size_t start_index, + size_t step_index) const; void DecodeBatch(const std::vector>& batch_token_ids, std::vector* results, bool skip_special_tokens = true) const;