diff --git a/src/cudadecoder/Makefile b/src/cudadecoder/Makefile index 934d5bcc570..345ca746aaa 100644 --- a/src/cudadecoder/Makefile +++ b/src/cudadecoder/Makefile @@ -19,7 +19,7 @@ OBJFILES = cuda-decoder.o cuda-decoder-kernels.o cuda-fst.o \ batched-threaded-nnet3-cuda-pipeline2.o \ batched-static-nnet3.o batched-static-nnet3-kernels.o \ cuda-online-pipeline-dynamic-batcher.o decodable-cumatrix.o \ - lattice-postprocessor.o + cuda-pipeline-common.o lattice-postprocessor.o LIBNAME = kaldi-cudadecoder diff --git a/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.cc b/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.cc index 588760a6606..cf5b1e7a888 100644 --- a/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.cc +++ b/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.cc @@ -21,12 +21,13 @@ #include "cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h" +#include + #include #include #include -#include - +#include "cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h" #include "feat/feature-window.h" #include "lat/lattice-functions.h" #include "nnet3/nnet-utils.h" @@ -74,12 +75,14 @@ void BatchedThreadedNnet3CudaOnlinePipeline::AllocateAndInitializeData( d_all_log_posteriors_.Resize(max_batch_size_ * output_frames_per_chunk_, trans_model_->NumPdfs(), kUndefined); available_channels_.resize(config_.num_channels); + channels_info_.reset(new std::vector(config_.num_channels)); + is_end_of_segment_.resize(config_.max_batch_size); + is_end_of_stream_.resize(config_.max_batch_size); lattice_callbacks_.reserve(config_.num_channels); best_path_callbacks_.reserve(config_.num_channels); std::iota(available_channels_.begin(), available_channels_.end(), 0); // 0,1,2,3.. corr_id2channel_.reserve(config_.num_channels); - channel_frame_offset_.resize(config_.num_channels, 0); // Feature extraction if (config_.use_gpu_feature_extraction) { @@ -111,8 +114,17 @@ void BatchedThreadedNnet3CudaOnlinePipeline::AllocateAndInitializeData( void BatchedThreadedNnet3CudaOnlinePipeline::SetLatticeCallback( CorrelationID corr_id, const LatticeCallback &callback) { - std::lock_guard lk(map_callbacks_m_); - lattice_callbacks_.insert({corr_id, callback}); + SegmentedResultsCallback segmented_callback = + [=](SegmentedLatticeCallbackParams params) { + if (params.results.empty()) { + KALDI_WARN << "Empty result for callback"; + return; + } + CompactLattice &clat = params.results[0].GetLatticeResult(); + callback(clat); + }; + SetLatticeCallback(corr_id, segmented_callback, + CudaPipelineResult::RESULT_TYPE_LATTICE); } void BatchedThreadedNnet3CudaOnlinePipeline::SetBestPathCallback( @@ -121,6 +133,13 @@ void BatchedThreadedNnet3CudaOnlinePipeline::SetBestPathCallback( best_path_callbacks_.insert({corr_id, callback}); } +void BatchedThreadedNnet3CudaOnlinePipeline::SetLatticeCallback( + CorrelationID corr_id, const SegmentedResultsCallback &callback, + const int result_type) { + std::lock_guard lk(map_callbacks_m_); + lattice_callbacks_.insert({corr_id, {callback, result_type}}); +} + bool BatchedThreadedNnet3CudaOnlinePipeline::TryInitCorrID( CorrelationID corr_id, int32 wait_for_us) { bool inserted; @@ -167,7 +186,8 @@ bool BatchedThreadedNnet3CudaOnlinePipeline::TryInitCorrID( new OnlineNnet2FeaturePipeline(*feature_info_)); } - channel_frame_offset_[ichannel] = 0; + (*channels_info_)[ichannel].Reset(); + return true; } @@ -325,46 +345,16 @@ void BatchedThreadedNnet3CudaOnlinePipeline::DecodeBatch( if (!partial_hypotheses_) partial_hypotheses_ = &partial_hypotheses_buf_; if (!end_points_) end_points_ = &end_points_buf_; } + if (config_.reset_on_endpoint) { + if (!end_points_) end_points_ = &end_points_buf_; + } } - list_channels_first_chunk_.clear(); - for (size_t i = 0; i < is_first_chunk.size(); ++i) { - if (is_first_chunk[i]) list_channels_first_chunk_.push_back((*channels)[i]); - } - if (!list_channels_first_chunk_.empty()) - cuda_decoder_->InitDecoding(list_channels_first_chunk_); - RunNnet3(*channels, d_features, features_frame_stride, n_input_frames_valid, is_first_chunk, is_last_chunk, d_ivectors); - if (partial_hypotheses_) { - // We're going to have to generate the partial hypotheses - if (word_syms_ == nullptr) { - KALDI_ERR << "You need to set --word-symbol-table to use " - << "partial hypotheses"; - } - cuda_decoder_->AllowPartialHypotheses(); - } - if (end_points_) cuda_decoder_->AllowEndpointing(); - - RunDecoder(*channels); - if (partial_hypotheses_) { - partial_hypotheses_->resize(channels_.size()); - for (size_t i = 0; i < channels_.size(); ++i) { - PartialHypothesis *partial_hypothesis; - ChannelId ichannel = channels_[i]; - cuda_decoder_->GetPartialHypothesis(ichannel, &partial_hypothesis); - (*partial_hypotheses_)[i] = &partial_hypothesis->out_str; - } - } + RunDecoder(*channels, is_first_chunk); - if (end_points_) { - end_points_->resize(channels_.size()); - for (size_t i = 0; i < channels_.size(); ++i) { - ChannelId ichannel = channels_[i]; - (*end_points_)[i] = cuda_decoder_->EndpointDetected(ichannel); - } - } RunCallbacksAndFinalize(corr_ids, *channels, is_last_chunk); nvtxRangePop(); } @@ -411,30 +401,33 @@ void BatchedThreadedNnet3CudaOnlinePipeline::ComputeOneFeature(int element) { n_compute_features_not_done_.fetch_sub(1, std::memory_order_release); } -void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize( +void BatchedThreadedNnet3CudaOnlinePipeline::RunBestPathCallbacks( const std::vector &corr_ids, const std::vector &channels, const std::vector &is_last_chunk) { - // Best path callbacks - { - std::lock_guard lk(map_callbacks_m_); - if (!best_path_callbacks_.empty() && partial_hypotheses_ && end_points_) { - for (int i = 0; i < corr_ids.size(); ++i) { - CorrelationID corr_id = corr_ids[i]; - auto it_callback = best_path_callbacks_.find(corr_id); - if (it_callback != best_path_callbacks_.end()) { - // We have a best path callback for this corr_id - const std::string &best_path = *((*partial_hypotheses_)[i]); - bool partial = !is_last_chunk[i]; - bool endpoint_detected = (*end_points_)[i]; - // Run them on main thread - We could move the best path callbacks on - // the threadpool - it_callback->second(best_path, partial, endpoint_detected); - if (is_last_chunk[i]) best_path_callbacks_.erase(it_callback); - } + std::lock_guard lk(map_callbacks_m_); + if (!best_path_callbacks_.empty() && partial_hypotheses_ && end_points_) { + for (int i = 0; i < corr_ids.size(); ++i) { + CorrelationID corr_id = corr_ids[i]; + auto it_callback = best_path_callbacks_.find(corr_id); + if (it_callback != best_path_callbacks_.end()) { + // We have a best path callback for this corr_id + const std::string &best_path = *((*partial_hypotheses_)[i]); + bool partial = !is_end_of_segment_[i]; + bool endpoint_detected = (*end_points_)[i]; + // Run them on main thread - We could move the best path callbacks on + // the threadpool + it_callback->second(best_path, partial, endpoint_detected); + + // If end of stream, clean up + if (is_end_of_stream_[i]) best_path_callbacks_.erase(it_callback); } } } +} +void BatchedThreadedNnet3CudaOnlinePipeline::RunLatticeCallbacks( + const std::vector &corr_ids, + const std::vector &channels, const std::vector &is_last_chunk) { list_channels_last_chunk_.clear(); list_corr_id_last_chunk_.clear(); list_lattice_callbacks_last_chunk_.clear(); @@ -442,25 +435,53 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize( std::lock_guard lk_callbacks(map_callbacks_m_); std::lock_guard lk_channels(available_channels_m_); for (int i = 0; i < is_last_chunk.size(); ++i) { - if (is_last_chunk[i]) { - ChannelId ichannel = channels[i]; - CorrelationID corr_id = corr_ids[i]; - - bool has_lattice_callback = false; - decltype(lattice_callbacks_.end()) it_lattice_callback; - if (!lattice_callbacks_.empty()) { - it_lattice_callback = lattice_callbacks_.find(corr_id); - has_lattice_callback = - (it_lattice_callback != lattice_callbacks_.end()); - } + // Only generating a lattice at end of segments + if (!is_end_of_segment_[i]) continue; + + ChannelId ichannel = channels[i]; + CorrelationID corr_id = corr_ids[i]; + ChannelInfo &channel_info = (*channels_info_)[ichannel]; + + // End of segment, so we'll reset the decoder + // We can only do it after the lattice has been generated + // We'll check can_reset_decoder before resetting this channel + // In practice we are decoding batches multiple times faster than + // realtime, so we shouldn't have to wait on can_reset_decoder + channel_info.must_reset_decoder = true; + channel_info.can_reset_decoder.store(false); + ++channel_info.segmentid; + + // Used by FinalizeDecoding to know if we should cleanup + bool has_lattice_callback = false; + decltype(lattice_callbacks_.end()) it_lattice_callback; + if (!lattice_callbacks_.empty()) { + it_lattice_callback = lattice_callbacks_.find(corr_id); + has_lattice_callback = + (it_lattice_callback != lattice_callbacks_.end()); + } + if (has_lattice_callback) { + std::unique_ptr lattice_callback( + new CallbackWithOptions(it_lattice_callback->second)); + // We will trigger this callback + lattice_callback->is_last_segment = is_end_of_stream_[i]; + lattice_callback->segment_id = channel_info.segmentid; + list_channels_last_chunk_.push_back(ichannel); + list_corr_id_last_chunk_.push_back(corr_id); + list_lattice_callbacks_last_chunk_.push_back( + std::move(lattice_callback)); + } + + // If we are end of stream (last segment) + // we need to do some cleanup + if (is_end_of_stream_[i]) { if (has_lattice_callback) { - LatticeCallback *lattice_callback = - new LatticeCallback(std::move(it_lattice_callback->second)); + // We need to generate a lattice, so we cannot free the channel right + // away indicating that this is the last segment so that we know that + // we need to free the channel also erasing the callback lattice_callbacks_.erase(it_lattice_callback); - list_channels_last_chunk_.push_back(ichannel); - list_corr_id_last_chunk_.push_back(corr_id); - list_lattice_callbacks_last_chunk_.push_back(lattice_callback); } else { + // We don't have any callback to run. + // So freeing up the channel now // All done with this corr_ids. Cleaning up available_channels_.push_back(ichannel); int32 ndeleted = corr_id2channel_.erase(corr_id); @@ -488,11 +509,41 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize( // delete data used for decoding that corr_id for (int32 i = 0; i < list_channels_last_chunk_.size(); ++i) { uint64_t ichannel = list_channels_last_chunk_[i]; - LatticeCallback *lattice_callback = list_lattice_callbacks_last_chunk_[i]; - thread_pool_->Push( - {&BatchedThreadedNnet3CudaOnlinePipeline::FinalizeDecodingWrapper, this, - ichannel, static_cast(lattice_callback)}); + ChannelInfo &channel_info = (*channels_info_)[ichannel]; + bool q_was_empty; + { + std::lock_guard lk(channel_info.mutex); + q_was_empty = channel_info.queue.empty(); + channel_info.queue.push(std::move(list_lattice_callbacks_last_chunk_[i])); + } + if (q_was_empty) { + // If q is not empty, it means we already have a task in the threadpool + // for that channel it is important to run those task in FIFO order if + // empty, run a new task + thread_pool_->Push( + {&BatchedThreadedNnet3CudaOnlinePipeline::FinalizeDecodingWrapper, + this, ichannel, /* ignored */ nullptr}); + } + } +} + +void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize( + const std::vector &corr_ids, + const std::vector &channels, const std::vector &is_last_chunk) { + // Reading endpoints, figuring out is_end_of_segment_ + for (size_t i = 0; i < is_last_chunk.size(); ++i) { + bool endpoint_detected = false; + if (config_.reset_on_endpoint) { + KALDI_ASSERT(end_points_); + endpoint_detected = (*end_points_)[i]; + } + is_end_of_segment_[i] = endpoint_detected || is_last_chunk[i]; + is_end_of_stream_[i] = is_last_chunk[i]; } + + RunBestPathCallbacks(corr_ids, channels, is_last_chunk); + + RunLatticeCallbacks(corr_ids, channels, is_last_chunk); } void BatchedThreadedNnet3CudaOnlinePipeline::ListIChannelsInBatch( @@ -521,11 +572,79 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunNnet3( &d_all_log_posteriors_, &all_frames_log_posteriors_); } +void BatchedThreadedNnet3CudaOnlinePipeline::InitDecoding( + const std::vector &channels, const std::vector &is_first_chunk) { + init_decoding_list_channels_.clear(); + for (size_t i = 0; i < is_first_chunk.size(); ++i) { + int ichannel = channels[i]; + ChannelInfo &channel_info = (*channels_info_)[ichannel]; + + bool should_reset_decoder = is_first_chunk[i]; + + // If reset_on_endpoint is set, we might need to reset channels even if + // is_first_chunk is false + if (channel_info.must_reset_decoder) { + // Making sure the last ConcurrentGetRawLatticeSingleChannel has completed + // on this channel + // It shouldn't trigger in practice - pipeline runs multiple time faster + // than realtime + while (!channel_info.can_reset_decoder.load()) { + usleep(kSleepForChannelAvailable); // TODO + } + should_reset_decoder = true; + channel_info.must_reset_decoder = false; + + // Before resetting the channel, saving the offset of the next segment + channel_info.segment_offset_seconds += + cuda_decoder_->NumFramesDecoded(ichannel) * + GetDecoderFrameShiftSeconds(); + } + + if (should_reset_decoder) + init_decoding_list_channels_.push_back((channels)[i]); + } + + if (!init_decoding_list_channels_.empty()) + cuda_decoder_->InitDecoding(init_decoding_list_channels_); +} + void BatchedThreadedNnet3CudaOnlinePipeline::RunDecoder( - const std::vector &channels) { + const std::vector &channels, const std::vector &is_first_chunk) { + if (partial_hypotheses_) { + // We're going to have to generate the partial hypotheses + if (word_syms_ == nullptr) { + KALDI_ERR << "You need to set --word-symbol-table to use " + << "partial hypotheses"; + } + cuda_decoder_->AllowPartialHypotheses(); + } + if (end_points_) cuda_decoder_->AllowEndpointing(); + + // Will check which channels needs to be init (or reset), + // and call the decoder's InitDecoding + InitDecoding(channels, is_first_chunk); + for (int iframe = 0; iframe < all_frames_log_posteriors_.size(); ++iframe) { cuda_decoder_->AdvanceDecoding(all_frames_log_posteriors_[iframe]); } + + if (partial_hypotheses_) { + partial_hypotheses_->resize(channels_.size()); + for (size_t i = 0; i < channels_.size(); ++i) { + PartialHypothesis *partial_hypothesis; + ChannelId ichannel = channels_[i]; + cuda_decoder_->GetPartialHypothesis(ichannel, &partial_hypothesis); + (*partial_hypotheses_)[i] = &partial_hypothesis->out_str; + } + } + + if (end_points_) { + end_points_->resize(channels_.size()); + for (size_t i = 0; i < channels_.size(); ++i) { + ChannelId ichannel = channels_[i]; + (*end_points_)[i] = cuda_decoder_->EndpointDetected(ichannel); + } + } } void BatchedThreadedNnet3CudaOnlinePipeline::ReadParametersFromModel() { @@ -554,68 +673,116 @@ void BatchedThreadedNnet3CudaOnlinePipeline::ReadParametersFromModel() { output_frames_per_chunk_ = cuda_nnet3_->GetNOutputFramesPerChunk(); } -void BatchedThreadedNnet3CudaOnlinePipeline::FinalizeDecoding( - int32 ichannel, const LatticeCallback *callback) { - Lattice lat; - cuda_decoder_->ConcurrentGetRawLatticeSingleChannel(ichannel, &lat); +void BatchedThreadedNnet3CudaOnlinePipeline::FinalizeDecoding(int32 ichannel) { + ChannelInfo &channel_info = (*channels_info_)[ichannel]; + + while (true) { + std::unique_ptr callback_w_options; + { + std::lock_guard lk(channel_info.mutex); + // This is either the first iter of the loop, or we have tested for + // empty() at end of previous iter + KALDI_ASSERT(!channel_info.queue.empty()); + callback_w_options = std::move(channel_info.queue.front()); + // we'll pop when done. this is used to track when we have a worker + // running in thread pool + } - // Getting the channel callback now, we're going to free that channel - // Done with this channel. Making it available again - { - std::lock_guard lk(available_channels_m_); - available_channels_.push_back(ichannel); - } + Lattice lat; + cuda_decoder_->ConcurrentGetRawLatticeSingleChannel(ichannel, &lat); + + BaseFloat segment_offset_seconds = channel_info.segment_offset_seconds; + + if (callback_w_options->is_last_segment) { + // If this is the last segment, we can make that channel available again + std::lock_guard lk(available_channels_m_); + available_channels_.push_back(ichannel); + } else { + // If this is the end of a segment but not end of stream, we keep the + // channel open, but we will reset the decoder. Saying that we can reset + // it now. + (*channels_info_)[ichannel].can_reset_decoder.store( + true, std::memory_order_release); + } - // If necessary, determinize the lattice - CompactLattice dlat; - if (config_.determinize_lattice) { - DeterminizeLatticePhonePrunedWrapper(*trans_model_, &lat, - config_.decoder_opts.lattice_beam, - &dlat, config_.det_opts); - } else { - ConvertLattice(lat, &dlat); - } + // If necessary, determinize the lattice + CompactLattice dlat; + if (config_.determinize_lattice) { + DeterminizeLatticePhonePrunedWrapper(*trans_model_, &lat, + config_.decoder_opts.lattice_beam, + &dlat, config_.det_opts); + } else { + ConvertLattice(lat, &dlat); + } - if (dlat.NumStates() > 0) { - // Used for debugging - if (false && word_syms_) { - CompactLattice best_path_clat; - CompactLatticeShortestPath(dlat, &best_path_clat); - - Lattice best_path_lat; - ConvertLattice(best_path_clat, &best_path_lat); - - std::vector alignment; - std::vector words; - LatticeWeight weight; - GetLinearSymbolSequence(best_path_lat, &alignment, &words, &weight); - std::ostringstream oss; - for (size_t i = 0; i < words.size(); i++) { - std::string s = word_syms_->Find(words[i]); - if (s == "") oss << "Word-id " << words[i] << " not in symbol table."; - oss << s << " "; + if (dlat.NumStates() > 0) { + // Used for debugging + if (false && word_syms_) { + CompactLattice best_path_clat; + CompactLatticeShortestPath(dlat, &best_path_clat); + + Lattice best_path_lat; + ConvertLattice(best_path_clat, &best_path_lat); + + std::vector alignment; + std::vector words; + LatticeWeight weight; + GetLinearSymbolSequence(best_path_lat, &alignment, &words, &weight); + std::ostringstream oss; + for (size_t i = 0; i < words.size(); i++) { + std::string s = word_syms_->Find(words[i]); + if (s == "") oss << "Word-id " << words[i] << " not in symbol table."; + oss << s << " "; + } + { + std::lock_guard lk(stdout_m_); + KALDI_LOG << "OUTPUT: " << oss.str(); + } } - { - std::lock_guard lk(stdout_m_); - KALDI_LOG << "OUTPUT: " << oss.str(); + } + + // if ptr set and if callback func callable + if (callback_w_options) { + const SegmentedResultsCallback &callback = callback_w_options->callback; + if (callback) { + SegmentedLatticeCallbackParams params; + params.results.emplace_back(); + CudaPipelineResult &result = params.results[0]; + if (callback_w_options->is_last_segment) result.SetAsLastSegment(); + result.SetSegmentID(callback_w_options->segment_id); + result.SetTimeOffsetSeconds(segment_offset_seconds); + + SetResultUsingLattice(dlat, callback_w_options->result_type, + lattice_postprocessor_, &result); + (callback)(params); } } - } - // if ptr set and if callback func callable - if (callback && *callback) { - (*callback)(dlat); - delete callback; + // Callback has been run + n_lattice_callbacks_not_done_.fetch_sub(1, std::memory_order_release); + + // pop. This marks task as done + { + std::lock_guard lk(channel_info.mutex); + channel_info.queue.pop(); + // Need to stop now. If the queue is seen as empty, we'll assume we have + // no worker running in threadpool the mutex will get unlocked in + // destructor + if (channel_info.queue.empty()) break; + } } +} - n_lattice_callbacks_not_done_.fetch_sub(1, std::memory_order_release); +void BatchedThreadedNnet3CudaOnlinePipeline::SetLatticePostprocessor( + const std::shared_ptr &lattice_postprocessor) { + lattice_postprocessor_ = lattice_postprocessor; + lattice_postprocessor_->SetDecoderFrameShift(GetDecoderFrameShiftSeconds()); + lattice_postprocessor_->SetTransitionModel(&GetTransitionModel()); } void BatchedThreadedNnet3CudaOnlinePipeline::WaitForLatticeCallbacks() { - while (n_lattice_callbacks_not_done_.load() != 0) - Sleep(kSleepForCallBack); + while (n_lattice_callbacks_not_done_.load() != 0) Sleep(kSleepForCallBack); } - } // namespace cuda_decoder } // namespace kaldi diff --git a/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h b/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h index b07ac0e5e83..36741d8074c 100644 --- a/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h +++ b/src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h @@ -28,6 +28,8 @@ #include "base/kaldi-utils.h" #include "cudadecoder/batched-static-nnet3.h" #include "cudadecoder/cuda-decoder.h" +#include "cudadecoder/cuda-pipeline-common.h" +#include "cudadecoder/lattice-postprocessor.h" #include "cudadecoder/thread-pool-light.h" #include "cudafeat/online-batched-feature-pipeline-cuda.h" #include "feat/wave-reader.h" @@ -64,7 +66,8 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig { num_worker_threads(-1), determinize_lattice(true), num_decoder_copy_threads(2), - use_gpu_feature_extraction(true) {} + use_gpu_feature_extraction(true), + reset_on_endpoint(false) {} void Register(OptionsItf *po) { po->Register("max-batch-size", &max_batch_size, "The maximum execution batch size. " @@ -85,6 +88,9 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig { "the host to host copies."); po->Register("gpu-feature-extract", &use_gpu_feature_extraction, "Use GPU feature extraction"); + po->Register( + "reset-on-endpoint", &reset_on_endpoint, + "Reset a decoder channel when endpoint detected. Do not close stream"); feature_opts.Register(po); decoder_opts.Register(po); @@ -97,6 +103,7 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig { bool determinize_lattice; int num_decoder_copy_threads; bool use_gpu_feature_extraction; + bool reset_on_endpoint; OnlineNnet2FeaturePipelineConfig feature_opts; CudaDecoderConfig decoder_opts; @@ -165,16 +172,17 @@ class BatchedThreadedNnet3CudaOnlinePipeline { void SetLatticeCallback(CorrelationID corr_id, const LatticeCallback &callback); - // Chunk of one utterance. We receive batches of those chunks through - // DecodeBatch - // Contains pointers to that chunk, the corresponding correlation ID, - // and whether that chunk is the last one for that utterance - struct UtteranceChunk { - CorrelationID corr_id; - SubVector wave_samples; - bool last_chunk; // sets to true if last chunk for that - // utterance - }; + // Set callback using SegmentedResultsCallback + // Able to run lattice postprocessor and generate CTM outputs + void SetLatticeCallback(CorrelationID corr_id, + const SegmentedResultsCallback &callback, + const int result_type); + // Lattice postprocessor + // Applied on both lattice output or CTM output + // Optional if lattice output is used + // Must be set if a result of type RESULT_TYPE_CTM is used + void SetLatticePostprocessor( + const std::shared_ptr &lattice_postprocessor); // Receive a batch of chunks. Will decode them, then return. // If it contains some last chunks for given utterances, it will call @@ -293,31 +301,76 @@ class BatchedThreadedNnet3CudaOnlinePipeline { const std::vector &is_last_chunk, const std::vector &d_ivectors); - void RunDecoder(const std::vector &channels); + void RunDecoder(const std::vector &channels, + const std::vector &is_first_chunk); + + void InitDecoding(const std::vector &channels, + const std::vector &is_first_chunk); void RunCallbacksAndFinalize(const std::vector &corr_ids, const std::vector &channels, const std::vector &is_last_chunk); + void RunBestPathCallbacks(const std::vector &corr_ids, + const std::vector &channels, + const std::vector &is_last_chunk); + + void RunLatticeCallbacks(const std::vector &corr_ids, + const std::vector &channels, + const std::vector &is_last_chunk); + // Set d_features_ptrs_ and d_ivectors_ptrs_ using channels_ void SetFeaturesPtrs(); // If an utterance is done, we call FinalizeDecoding async on // the threadpool // it will call the utterance's callback when done - void FinalizeDecoding(int32 ichannel, const LatticeCallback *callback); + void FinalizeDecoding(int32 ichannel); + // static wrapper for thread pool static void FinalizeDecodingWrapper(void *obj, uint64_t ichannel64, - void *callback_ptr) { + void *ignored) { int32 ichannel = static_cast(ichannel64); - const LatticeCallback *callback = - static_cast(callback_ptr); static_cast(obj) - ->FinalizeDecoding(ichannel, callback); + ->FinalizeDecoding(ichannel); } + + // + // Internal structs + // + + struct ChannelInfo { + int segmentid; + // Set when an endpoint was detected on the previous chunk + bool must_reset_decoder; + // We need to wait for the previous chunk ConcurrentGetRawLattice to finish + // before we can reset the decoder on this channel + std::atomic_bool can_reset_decoder; + BaseFloat segment_offset_seconds; + + std::queue> queue; + std::mutex mutex; + + void Reset() { + segmentid = 0; + must_reset_decoder = false; + can_reset_decoder.store(false); + segment_offset_seconds = 0; + // do not reset queue - a async task might still be executing + // this is fine, even if we mix different corr_ids in the same channel + // all relevant information is stored in CallbackWithOptions + } + }; + + // // Data members + // BatchedThreadedNnet3CudaOnlinePipelineConfig config_; + + // Using unique_ptr to be able to allocate the vector directly with the right + // size We cannot move std::atomic + std::unique_ptr> channels_info_; int32 max_batch_size_; // extracted from config_ // Models const TransitionModel *trans_model_; @@ -340,17 +393,24 @@ class BatchedThreadedNnet3CudaOnlinePipeline { std::vector partial_hypotheses_buf_; std::vector end_points_buf_; + // Used to know if a chunk is the end of a segment, but not necessarly end of + // stream + std::vector is_end_of_segment_; + // End of stream (end of last segment) + std::vector is_end_of_stream_; + // The callback is called once the final lattice is ready - std::unordered_map lattice_callbacks_; + std::unordered_map + lattice_callbacks_; + // Used for both final and partial best paths std::unordered_map best_path_callbacks_; // Lock for callbacks std::mutex map_callbacks_m_; - // New channels in the current batch. We've just received - // their first batch - std::vector list_channels_first_chunk_; + // We'll call init decoding on those channels + std::vector init_decoding_list_channels_; std::vector n_samples_valid_, n_input_frames_valid_; @@ -361,11 +421,8 @@ class BatchedThreadedNnet3CudaOnlinePipeline { // their last chunk std::vector list_channels_last_chunk_; std::vector list_corr_id_last_chunk_; - std::vector list_lattice_callbacks_last_chunk_; - - // Number of frames already computed in channel (before - // curr_batch_) - std::vector channel_frame_offset_; + std::vector> + list_lattice_callbacks_last_chunk_; // Parameters extracted from the models int input_frames_per_chunk_; @@ -410,6 +467,9 @@ class BatchedThreadedNnet3CudaOnlinePipeline { // Only used if feature extraction is run on the CPU std::vector> feature_pipelines_; + // Use to postprocess lattices/generate CTM outputs + std::shared_ptr lattice_postprocessor_; + // HCLG graph : CudaFst object is a host object, but contains // data stored in // GPU memory diff --git a/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline2.cc b/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline2.cc index b0dc39ed486..af69a1cadf7 100644 --- a/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline2.cc +++ b/src/cudadecoder/batched-threaded-nnet3-cuda-pipeline2.cc @@ -293,7 +293,8 @@ void BatchedThreadedNnet3CudaPipeline2::SegmentedDecodeWithCallback( } else { std::unique_ptr> h_wave_segment( new SubVector(h_wave.Data() + offset, nsamples)); - BaseFloat offset_seconds = offset / model_freq_; + BaseFloat offset_seconds = + std::floor(static_cast(offset) / model_freq_); // Saving this segment offset in result for later use (*segmented_results)[isegment].SetTimeOffsetSeconds(offset_seconds); @@ -304,25 +305,9 @@ void BatchedThreadedNnet3CudaPipeline2::SegmentedDecodeWithCallback( // call the segmented callback with the vector of results LatticeCallback callback = [=](CompactLattice &clat) { CudaPipelineResult &result = (*segmented_results)[isegment]; - if (result_type & CudaPipelineResult::RESULT_TYPE_LATTICE) { - if (lattice_postprocessor_) { - CompactLattice postprocessed_clat; - bool ok = lattice_postprocessor_->GetPostprocessedLattice( - clat, &postprocessed_clat); - if (ok) result.SetLatticeResult(std::move(postprocessed_clat)); - } else { - result.SetLatticeResult(std::move(clat)); - } - } - if (result_type & CudaPipelineResult::RESULT_TYPE_CTM) { - CTMResult ctm_result; - KALDI_ASSERT(lattice_postprocessor_ && - "A lattice postprocessor must be set with " - "SetLatticePostprocessor() to use RESULT_TYPE_CTM"); - bool ok = lattice_postprocessor_->GetCTM(clat, &ctm_result); - if (ok) result.SetCTMResult(std::move(ctm_result)); - } + SetResultUsingLattice(clat, result_type, lattice_postprocessor_, + &result); int n_not_done = n_segments_callbacks_not_done_->fetch_sub(1); if (n_not_done == 1 && segmented_callback) { diff --git a/src/cudadecoder/cuda-pipeline-common.cc b/src/cudadecoder/cuda-pipeline-common.cc new file mode 100644 index 00000000000..7e29e21588c --- /dev/null +++ b/src/cudadecoder/cuda-pipeline-common.cc @@ -0,0 +1,146 @@ +// cudadecoder/cuda-pipeline-common.cc +// +// Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. +// Hugo Braun +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#if HAVE_CUDA == 1 + +#include "cudadecoder/cuda-pipeline-common.h" + +#define KALDI_CUDA_DECODER_BIN_FLOAT_PRINT_PRECISION 2 + +namespace kaldi { +namespace cuda_decoder { + +int NumberOfSegments(int nsamples, int seg_length, int seg_shift) { + KALDI_ASSERT(seg_shift > 0); + KALDI_ASSERT(seg_length >= seg_shift); + int r = seg_length - seg_shift; + if (nsamples <= seg_length) return 1; + int nsegments = ((nsamples - r) + seg_shift - 1) / seg_shift; + return nsegments; +} + +void WriteLattices(std::vector &results, + const std::string &key, bool print_offsets, + CompactLatticeWriter &clat_writer) { + for (CudaPipelineResult &result : results) { + double offset = result.GetTimeOffsetSeconds(); + if (!result.HasValidResult()) { + KALDI_WARN << "Utterance " << key << ": " + << " Segment with offset " << offset + << " is not valid. Skipping"; + } + + std::ostringstream key_with_offset; + key_with_offset << key; + if (print_offsets) key_with_offset << "-" << offset; + clat_writer.Write(key_with_offset.str(), result.GetLatticeResult()); + if (!print_offsets) { + if (results.size() > 1) { + KALDI_WARN << "Utterance " << key + << " has multiple segments but only one is written to " + "output. Use print_offsets=true"; + } + break; // printing only one result if offsets are not used + } + } +} + +// Reads all CTM outputs in results and merge them together +// into a single output. That output is then written as a CTM text format to +// ostream +void MergeSegmentsToCTMOutput(std::vector &results, + const std::string &key, std::ostream &ostream, + fst::SymbolTable *word_syms, + bool use_segment_offsets) { + size_t nresults = results.size(); + + if (nresults == 0) { + KALDI_WARN << "Utterance " << key << " has no results. Skipping"; + return; + } + + bool all_results_valid = true; + + for (size_t iresult = 0; iresult < nresults; ++iresult) + all_results_valid &= results[iresult].HasValidResult(); + + if (!all_results_valid) { + KALDI_WARN << "Utterance " << key + << " has at least one segment with an error. Skipping"; + return; + } + + ostream << std::fixed; + ostream.precision(KALDI_CUDA_DECODER_BIN_FLOAT_PRINT_PRECISION); + + // opt: combine results into one here + BaseFloat previous_segment_word_end = 0; + for (size_t iresult = 0; iresult < nresults; ++iresult) { + bool this_segment_first_word = true; + bool is_last_segment = ((iresult + 1) == nresults); + BaseFloat next_offset_seconds = std::numeric_limits::max(); + if (!is_last_segment) { + next_offset_seconds = results[iresult + 1].GetTimeOffsetSeconds(); + } + + auto &result = results[iresult]; + BaseFloat offset_seconds = + use_segment_offsets ? result.GetTimeOffsetSeconds() : 0; + int isegment = result.GetSegmentID(); + auto &ctm = result.GetCTMResult(); + for (size_t iword = 0; iword < ctm.times_seconds.size(); ++iword) { + BaseFloat word_from = offset_seconds + ctm.times_seconds[iword].first; + BaseFloat word_to = offset_seconds + ctm.times_seconds[iword].second; + + // If beginning of this segment, only keep "new" words + // i.e. the ones that were not already in previous segment + if (this_segment_first_word) { + if (word_from >= previous_segment_word_end) { + // Found the first "new" word for this segment + this_segment_first_word = false; + } else + continue; // skipping this word + } + + // If end of this segment, skip the words which are + // overlapping two segments + if (!is_last_segment) { + if (word_from >= next_offset_seconds) break; // done with this segment + } + + previous_segment_word_end = word_to; + + ostream << key << " " << isegment << " " << word_from << ' ' + << (word_to - word_from) << ' '; + + int32 word_id = ctm.words[iword]; + if (word_syms) + ostream << word_syms->Find(word_id); + else + ostream << word_id; + + ostream << ' ' << ctm.conf[iword] << '\n'; + } + } +} + +} // namespace cuda_decoder +} // namespace kaldi + +#endif diff --git a/src/cudadecoder/cuda-pipeline-common.h b/src/cudadecoder/cuda-pipeline-common.h index 31cf31cd4ab..bf8fdb39717 100644 --- a/src/cudadecoder/cuda-pipeline-common.h +++ b/src/cudadecoder/cuda-pipeline-common.h @@ -33,14 +33,7 @@ namespace cuda_decoder { // Number of segments of a given length and shift in an utterance of total // length nsamples -inline int NumberOfSegments(int nsamples, int seg_length, int seg_shift) { - KALDI_ASSERT(seg_shift > 0); - KALDI_ASSERT(seg_length >= seg_shift); - int r = seg_length - seg_shift; - if (nsamples <= seg_length) return 1; - int nsegments = ((nsamples - r) + seg_shift - 1) / seg_shift; - return nsegments; -} +int NumberOfSegments(int nsamples, int seg_length, int seg_shift); // Segmentation config struct, used in cuda pipelines struct CudaPipelineSegmentationConfig { @@ -78,17 +71,26 @@ class CudaPipelineResult { CompactLattice clat_; CTMResult ctm_result_; BaseFloat offset_seconds_; + int32 segment_id_; + bool is_last_segment_; public: static constexpr int RESULT_TYPE_LATTICE = 1; static constexpr int RESULT_TYPE_CTM = 2; - CudaPipelineResult() : result_type_(0), offset_seconds_(0) {} + CudaPipelineResult() + : result_type_(0), + offset_seconds_(0), + segment_id_(0), + is_last_segment_(false) {} int32 GetResultType() const { return result_type_; } bool HasValidResult() const { return result_type_; } + int32 GetSegmentID() { return segment_id_; } + bool IsLastSegment() { return is_last_segment_; } + void SetLatticeResult(CompactLattice &&clat) { result_type_ |= RESULT_TYPE_LATTICE; // We can switch to std::forward if there's a use for lvalues @@ -100,13 +102,13 @@ class CudaPipelineResult { ctm_result_ = std::move(ctm); } - const CompactLattice &GetLatticeResult() const { + CompactLattice &GetLatticeResult() { KALDI_ASSERT("Lattice result was not requested" && result_type_ & RESULT_TYPE_LATTICE); return clat_; } - const CTMResult &GetCTMResult() const { + CTMResult &GetCTMResult() { KALDI_ASSERT("CTM result was not requested" && result_type_ & RESULT_TYPE_CTM); return ctm_result_; @@ -117,6 +119,9 @@ class CudaPipelineResult { offset_seconds_ = offset_seconds; } + void SetSegmentID(int segment_id) { segment_id_ = segment_id; } + void SetAsLastSegment() { is_last_segment_ = true; } + BaseFloat GetTimeOffsetSeconds() const { return offset_seconds_; } }; @@ -128,6 +133,13 @@ typedef std::function SegmentedResultsCallback; typedef std::function LatticeCallback; +struct CallbackWithOptions { + SegmentedResultsCallback callback; + int result_type; + int segment_id; + bool is_last_segment; +}; + struct HostDeviceVector { cudaEvent_t evt; BaseFloat *h_data; @@ -178,8 +190,23 @@ struct HostDeviceVector { } } }; -} // namespace cuda_decoder +// Write all lattices in results using clat_writer +// If print_offsets is true, will write each lattice +// under the key=[utterance_key]-[offset in seconds] +// prints_offsets should be true if results.size() > 1 +void WriteLattices(std::vector &results, + const std::string &key, bool print_offsets, + CompactLatticeWriter &clat_writer); + +// Reads all CTM outputs in results and merge them together +// into a single output. That output is then written as a CTM text format to +// ostream +void MergeSegmentsToCTMOutput(std::vector &results, + const std::string &key, std::ostream &ostream, + fst::SymbolTable *word_syms = NULL, + bool use_segment_offsets = true); +} // namespace cuda_decoder } // namespace kaldi #endif // KALDI_CUDA_DECODER_CUDA_PIPELINE_COMMON_ diff --git a/src/cudadecoder/lattice-postprocessor.cc b/src/cudadecoder/lattice-postprocessor.cc index 686f6b437ac..46d44216890 100644 --- a/src/cudadecoder/lattice-postprocessor.cc +++ b/src/cudadecoder/lattice-postprocessor.cc @@ -16,6 +16,7 @@ // limitations under the License. #include "cudadecoder/lattice-postprocessor.h" + #include "fstext/fstext-lib.h" #include "lat/kaldi-lattice.h" #include "lat/lattice-functions.h" @@ -53,6 +54,9 @@ void LatticePostprocessor::ApplyConfig() { bool LatticePostprocessor::GetPostprocessedLattice( CompactLattice &clat, CompactLattice *out_clat) const { + // Nothing to do for empty lattice + if (clat.NumStates() == 0) return true; + bool ok = true; // Scale lattice if (use_lattice_scale_) fst::ScaleLattice(lattice_scales_, &clat); @@ -77,15 +81,20 @@ bool LatticePostprocessor::GetPostprocessedLattice( if (!word_info_) KALDI_ERR << "You must set --word-boundary-rxfilename in the lattice " "postprocessor config"; - - ok &= WordAlignLattice(clat, *tmodel_, *word_info_, max_states, out_clat); + // ok &= + // Ignoring the return false for now (but will print a warning), + // because the doc says we can, and it can happen when using endpointing + WordAlignLattice(clat, *tmodel_, *word_info_, max_states, out_clat); return ok; } bool LatticePostprocessor::GetCTM(CompactLattice &clat, CTMResult *ctm_result) const { + // Empty CTM output for empty lattice + if (clat.NumStates() == 0) return true; + CompactLattice postprocessed_lattice; - if (!GetPostprocessedLattice(clat, &postprocessed_lattice)) return false; + GetPostprocessedLattice(clat, &postprocessed_lattice); // MBR MinimumBayesRisk mbr(postprocessed_lattice, config_.mbr_opts); @@ -101,6 +110,31 @@ bool LatticePostprocessor::GetCTM(CompactLattice &clat, return true; } + +void SetResultUsingLattice( + CompactLattice &clat, const int result_type, + const std::shared_ptr &lattice_postprocessor, + CudaPipelineResult *result) { + if (result_type & CudaPipelineResult::RESULT_TYPE_LATTICE) { + if (lattice_postprocessor) { + CompactLattice postprocessed_clat; + lattice_postprocessor->GetPostprocessedLattice(clat, &postprocessed_clat); + result->SetLatticeResult(std::move(postprocessed_clat)); + } else { + result->SetLatticeResult(std::move(clat)); + } + } + + if (result_type & CudaPipelineResult::RESULT_TYPE_CTM) { + CTMResult ctm_result; + KALDI_ASSERT(lattice_postprocessor && + "A lattice postprocessor must be set with " + "SetLatticePostprocessor() to use RESULT_TYPE_CTM"); + lattice_postprocessor->GetCTM(clat, &ctm_result); + result->SetCTMResult(std::move(ctm_result)); + } +} + } // namespace cuda_decoder } // namespace kaldi diff --git a/src/cudadecoder/lattice-postprocessor.h b/src/cudadecoder/lattice-postprocessor.h index 73d9aef4e9b..4ca80c15314 100644 --- a/src/cudadecoder/lattice-postprocessor.h +++ b/src/cudadecoder/lattice-postprocessor.h @@ -105,6 +105,26 @@ class LatticePostprocessor { word_boundary_rxfilename); } }; + +void SetResultUsingLattice( + CompactLattice &clat, const int result_type, + const std::shared_ptr &lattice_postprocessor, + CudaPipelineResult *result); + +// Read lattice postprocessor config, apply it, +// and assign it to the pipeline +template +void LoadAndSetLatticePostprocessor(const std::string &config_filename, + PIPELINE *cuda_pipeline) { + ParseOptions po(""); // No usage, reading from a file + LatticePostprocessorConfig pp_config; + pp_config.Register(&po); + po.ReadConfigFile(config_filename); + auto lattice_postprocessor = + std::make_shared(pp_config); + cuda_pipeline->SetLatticePostprocessor(lattice_postprocessor); +} + } // namespace cuda_decoder } // namespace kaldi diff --git a/src/cudadecoderbin/batched-wav-nnet3-cuda-online.cc b/src/cudadecoderbin/batched-wav-nnet3-cuda-online.cc index 6bad51f9a6e..7eb67b9f55c 100644 --- a/src/cudadecoderbin/batched-wav-nnet3-cuda-online.cc +++ b/src/cudadecoderbin/batched-wav-nnet3-cuda-online.cc @@ -23,15 +23,15 @@ #error CUDA support must be configured to compile this binary. #endif +#include +#include +#include + #include #include #include #include -#include -#include -#include - #include "cudadecoder/cuda-online-pipeline-dynamic-batcher.h" #include "cudadecoderbin/cuda-bin-tools.h" #include "cudamatrix/cu-allocator.h" @@ -58,11 +58,11 @@ struct Stream { Stream(const std::shared_ptr &wav, CorrelationID corr_id, double send_next_chunk_at, double *latency_ptr) - : wav(wav), - corr_id(corr_id), - offset(0), - send_next_chunk_at(send_next_chunk_at), - latency_ptr(latency_ptr) {} + : wav(wav), + corr_id(corr_id), + offset(0), + send_next_chunk_at(send_next_chunk_at), + latency_ptr(latency_ptr) {} bool operator<(const Stream &other) const { return (send_next_chunk_at > other.send_next_chunk_at); @@ -80,15 +80,35 @@ int main(int argc, char *argv[]) { fst::SymbolTable *word_syms; ReadModels(opts, &trans_model, &am_nnet, &decode_fst, &word_syms); BatchedThreadedNnet3CudaOnlinePipeline cuda_pipeline( - opts.batched_decoder_config, *decode_fst, am_nnet, trans_model); + opts.batched_decoder_config, *decode_fst, am_nnet, trans_model); delete decode_fst; if (word_syms) cuda_pipeline.SetSymbolTable(*word_syms); - CompactLatticeWriter clat_writer(opts.clat_wspecifier); - std::mutex clat_writer_m; + std::unique_ptr clat_writer; + std::unique_ptr ctm_writer; + OpenOutputHandles(opts.clat_wspecifier, &clat_writer, &ctm_writer); + + std::mutex output_writer_m_; if (!opts.write_lattice) { - KALDI_LOG << ("If you want to write lattices to disk, please set " - "--write-lattice=true"); + KALDI_LOG + << ("If you want to write lattices to disk, please set " + "--write-lattice=true"); + clat_writer.reset(); + } + // result_type is used by the pipeline to know what to generate + int result_type = 0; + if (ctm_writer) result_type |= CudaPipelineResult::RESULT_TYPE_CTM; + if (clat_writer) result_type |= CudaPipelineResult::RESULT_TYPE_LATTICE; + + // Lattice postprocessor + if (opts.lattice_postprocessor_config_rxfilename.empty()) { + if (ctm_writer) { + KALDI_ERR << "You must configure the lattice postprocessor with " + "--lattice-postprocessor-rxfilename to use CTM output"; + } + } else { + LoadAndSetLatticePostprocessor( + opts.lattice_postprocessor_config_rxfilename, &cuda_pipeline); } int chunk_length = cuda_pipeline.GetNSampsPerChunk(); @@ -101,10 +121,10 @@ int main(int argc, char *argv[]) { KALDI_ASSERT(all_wav.size() > 0); KALDI_ASSERT(all_wav.size() == all_wav_keys.size()); KALDI_LOG << "Loaded " << all_wav.size() << "files."; - for (int i=0; i < all_wav.size(); ++i) { + for (int i = 0; i < all_wav.size(); ++i) { if (all_wav[i]->Data().NumRows() <= 0) { - KALDI_ERR << "Bad file, 0 channels at index [" << i << "], id=" - << all_wav_keys[i]; + KALDI_ERR << "Bad file, 0 channels at index [" << i + << "], id=" << all_wav_keys[i]; } } @@ -152,7 +172,7 @@ int main(int argc, char *argv[]) { // "spoken" e.g. if the first chunk is made of 0.5s for audio, we have // to wait 0.5s after stream_will_start_at double first_chunk_available_at = - stream_will_start_at + std::min(stream_duration, chunk_seconds); + stream_will_start_at + std::min(stream_duration, chunk_seconds); // stream_will_stop_at is used for latency computation. // Streaming starts at t0 = stream_will_start_at and ends at @@ -163,51 +183,72 @@ int main(int argc, char *argv[]) { // we'll do lat_ptr = now - lat_ptr in callback *latency_ptr = stream_will_stop_at; - // Define the callback for results. - cuda_pipeline.SetBestPathCallback( - corr_id, - [&, latency_ptr, corr_id](const std::string &str, bool partial, - bool endpoint_detected) { - if (partial && opts.print_partial_hypotheses) { - KALDI_LOG << "corr_id #" << corr_id << " [partial] : " << str; - } - - if (endpoint_detected && opts.print_endpoints) { - KALDI_LOG << "corr_id #" << corr_id << " [endpoint detected]"; - } - - if (!partial) { - // *latency_ptr currently contains t1="stream_will_start_at + - // duration" where stream_will_start_at is the time when this - // stream has started and 'duration' is the duration of this - // audio file, so t1 is the time when the virtual user is done - // talking. timer.Elapsed() now contains t2, i.e. when the - // result is ready, the latency = t2 - t1. - if (!opts.generate_lattice) { - // If generating a lattice, latency will include the time - // spent generating it. - *latency_ptr = timer.Elapsed() - *latency_ptr; + // Define the callback for results + bool use_bestpath_callback = !result_type || + opts.print_partial_hypotheses || + opts.print_endpoints; + if (use_bestpath_callback) { + cuda_pipeline.SetBestPathCallback( + corr_id, [latency_ptr, &timer, &opts, corr_id, result_type]( + const std::string &str, bool partial, + bool endpoint_detected) { + if (partial && opts.print_partial_hypotheses) + KALDI_LOG << "corr_id #" << corr_id << " [partial] : " << str; + + if (endpoint_detected && opts.print_endpoints) + KALDI_LOG << "corr_id #" << corr_id << " [endpoint detected]"; + + if (!partial) { + // *latency_ptr currently contains t1="stream_will_start_at + + // duration" where stream_will_start_at is when this stream + // started and duration is the duration of this audio file so + // t1 is the time when the virtual user is done talking + // timer.Elapsed() now contains t2, i.e. when the result is + // ready latency = t2 - t1 + if (!result_type) { + // If we need to gen a lattice, latency will take the + // lattice gen into account + *latency_ptr = timer.Elapsed() - *latency_ptr; + } + if (opts.print_hypotheses) + KALDI_LOG << "corr_id #" << corr_id << " : " << str; } - if (opts.print_hypotheses) - KALDI_LOG << "corr_id #" << corr_id << " : " << str; - } - }); + }); + } - if (opts.generate_lattice) { + if (result_type) { // Setting a callback will indicate the pipeline to generate a // lattice int iter = all_wav_i / all_wav.size(); std::string key = all_wav_keys[all_wav_i_modulo]; if (iter > 0) key += std::to_string(iter); - cuda_pipeline.SetLatticeCallback( - corr_id, - [&, key, latency_ptr](CompactLattice &clat) { - *latency_ptr = timer.Elapsed() - *latency_ptr; - if (opts.write_lattice) { - std::lock_guard lk(clat_writer_m); - clat_writer.Write(key, clat); - } - }); + SegmentedResultsCallback segmented_callback = + [&clat_writer, latency_ptr, &timer, &ctm_writer, + &output_writer_m_, key, + word_syms](SegmentedLatticeCallbackParams ¶ms) { + if (params.results.empty()) { + KALDI_WARN << "Empty result for callback"; + return; + } + if (params.results[0].IsLastSegment()) { + *latency_ptr = timer.Elapsed() - *latency_ptr; + } + + if (clat_writer) { + std::lock_guard lk(output_writer_m_); + clat_writer->Write(key, params.results[0].GetLatticeResult()); + } + + if (ctm_writer) { + std::lock_guard lk(output_writer_m_); + MergeSegmentsToCTMOutput(params.results, key, + ctm_writer->Stream(), word_syms, + /* use segment offset */ false); + } + }; + + cuda_pipeline.SetLatticeCallback(corr_id, segmented_callback, + result_type); } // Adding that stream to our simulation stream pool @@ -231,9 +272,9 @@ int main(int argc, char *argv[]) { // Current chunk int32 total_num_samp = data.Dim(); int this_chunk_num_samp = - std::min(total_num_samp - chunk.offset, chunk_length); + std::min(total_num_samp - chunk.offset, chunk_length); bool is_last_chunk = - ((chunk.offset + this_chunk_num_samp) == total_num_samp); + ((chunk.offset + this_chunk_num_samp) == total_num_samp); bool is_first_chunk = (chunk.offset == 0); SubVector wave_part(data, chunk.offset, this_chunk_num_samp); @@ -250,7 +291,7 @@ int main(int argc, char *argv[]) { // send_next_chunk_at with send_next_chunk_at = send_current_chunk_at + // next_chunk_duration int next_chunk_num_samp = - std::min(total_num_samp - chunk.offset, chunk_length); + std::min(total_num_samp - chunk.offset, chunk_length); double next_chunk_seconds = next_chunk_num_samp * seconds_per_sample; chunk.send_next_chunk_at += next_chunk_seconds; @@ -266,7 +307,7 @@ int main(int argc, char *argv[]) { PrintLatencyStats(latencies); delete word_syms; - clat_writer.Close(); + if (clat_writer) clat_writer->Close(); cudaDeviceSynchronize(); return 0; diff --git a/src/cudadecoderbin/batched-wav-nnet3-cuda2.cc b/src/cudadecoderbin/batched-wav-nnet3-cuda2.cc index d748cc8bd1c..992b34598d2 100644 --- a/src/cudadecoderbin/batched-wav-nnet3-cuda2.cc +++ b/src/cudadecoderbin/batched-wav-nnet3-cuda2.cc @@ -21,7 +21,9 @@ #include #include #include + #include + #include "cudadecoder/batched-threaded-nnet3-cuda-pipeline2.h" #include "cudadecoderbin/cuda-bin-tools.h" #include "cudamatrix/cu-allocator.h" diff --git a/src/cudadecoderbin/cuda-bin-tools.h b/src/cudadecoderbin/cuda-bin-tools.h index 1672f9ac0b7..0cf21a9f5f4 100644 --- a/src/cudadecoderbin/cuda-bin-tools.h +++ b/src/cudadecoderbin/cuda-bin-tools.h @@ -33,8 +33,7 @@ namespace kaldi { namespace cuda_decoder { - -/// Print some statistics based on latencies stored in \p latencies. +// Print some statistics based on latencies stored in \p latencies. inline void PrintLatencyStats(std::vector &latencies) { if (latencies.empty()) return; double total = std::accumulate(latencies.begin(), latencies.end(), 0.); @@ -66,6 +65,7 @@ struct CudaOnlineBinaryOptions { bool generate_lattice = false; std::string word_syms_rxfilename, nnet3_rxfilename, fst_rxfilename, wav_rspecifier, clat_wspecifier; + std::string lattice_postprocessor_config_rxfilename; BatchedThreadedNnet3CudaOnlinePipelineConfig batched_decoder_config; }; @@ -73,11 +73,11 @@ inline int SetUpAndReadCmdLineOptions(int argc, char *argv[], CudaOnlineBinaryOptions *opts_ptr) { CudaOnlineBinaryOptions &opts = *opts_ptr; const char *usage = - "Reads in wav file(s) and simulates online decoding with neural nets\n" - "(nnet3 setup). Note: some configuration values and inputs are\n" - "set via config files whose filenames are passed as options\n\n" - "Usage: batched-wav-nnet3-cuda-online [options] " - " \n"; + "Reads in wav file(s) and simulates online decoding with neural nets\n" + "(nnet3 setup). Note: some configuration values and inputs are\n" + "set via config files whose filenames are passed as options\n\n" + "Usage: batched-wav-nnet3-cuda-online [options] " + " \n"; ParseOptions po(usage); po.Register("print-hypotheses", &opts.print_hypotheses, @@ -100,6 +100,9 @@ inline int SetUpAndReadCmdLineOptions(int argc, char *argv[], po.Register("generate-lattice", &opts.generate_lattice, "Generate full lattices"); po.Register("write-lattice", &opts.write_lattice, "Output lattice to a file"); + po.Register("lattice-postprocessor-rxfilename", + &opts.lattice_postprocessor_config_rxfilename, + "(optional) Config file for lattice postprocessor"); CuDevice::RegisterDeviceOptions(&po); RegisterCuAllocatorOptions(&po); @@ -175,83 +178,6 @@ inline void ReadDataset(const CudaOnlineBinaryOptions &opts, std::cout << "done" << std::endl; } -// Reads all CTM outputs in results and merge them together into a single -// output. That output is then written as a CTM text format to ostream. -inline void MergeSegmentsToCTMOutput(std::vector &results, - const std::string &key, - std::ostream &ostream, - fst::SymbolTable *word_syms = NULL) { - size_t nresults = results.size(); - - if (nresults == 0) { - KALDI_WARN << "Utterance " << key << " has no results. Skipping"; - return; - } - - bool all_results_valid = true; - - for (size_t iresult = 0; iresult < nresults; ++iresult) - all_results_valid &= results[iresult].HasValidResult(); - - if (!all_results_valid) { - KALDI_WARN << "Utterance " << key - << " has at least one segment with an error. Skipping"; - return; - } - - ostream << std::fixed; - ostream.precision(2); - - // opt: combine results into one here - BaseFloat previous_segment_word_end = 0; - for (size_t iresult = 0; iresult < nresults; ++iresult) { - bool this_segment_first_word = true; - bool is_last_segment = ((iresult + 1) == nresults); - BaseFloat next_offset_seconds = FLT_MAX; - if (!is_last_segment) { - next_offset_seconds = results[iresult + 1].GetTimeOffsetSeconds(); - } - - auto &result = results[iresult]; - BaseFloat offset_seconds = result.GetTimeOffsetSeconds(); - auto &ctm = result.GetCTMResult(); - for (size_t iword = 0; iword < ctm.times_seconds.size(); ++iword) { - BaseFloat word_from = offset_seconds + ctm.times_seconds[iword].first; - BaseFloat word_to = offset_seconds + ctm.times_seconds[iword].second; - - // If beginning of this segment, only keep "new" words - // i.e. the ones that were not already in previous segment - if (this_segment_first_word) { - if (word_from >= previous_segment_word_end) { - // Found the first "new" word for this segment - this_segment_first_word = false; - } else - continue; // skipping this word - } - - // If end of this segment, skip the words which are - // overlapping two segments - if (!is_last_segment) { - if (word_from >= next_offset_seconds) break; // done with this segment - } - - previous_segment_word_end = word_to; - - ostream << key << " 1 " << word_from << ' ' << (word_to - word_from) - << ' '; - - int32 word_id = ctm.words[iword]; - if (word_syms) { - ostream << word_syms->Find(word_id); - } else { - ostream << word_id; - } - - ostream << ' ' << ctm.conf[iword] << '\n'; - } - } -} - inline void OpenOutputHandles( const std::string &output_wspecifier, std::unique_ptr *clat_writer, @@ -268,48 +194,6 @@ inline void OpenOutputHandles( } } -// Write all lattices in results using clat_writer. If print_offsets is true, -// write each lattice under the key="[utterance_key]-[offset in seconds]". -// 'prints_offsets' should be true if results.size() > 1 -void WriteLattices(std::vector &results, - const std::string &key, bool print_offsets, - CompactLatticeWriter &clat_writer) { - for (const CudaPipelineResult &result : results) { - double offset = result.GetTimeOffsetSeconds(); - if (!result.HasValidResult()) { - KALDI_WARN << "Utterance " << key << ": " - << " Segment with offset " << offset - << " is not valid. Skipping"; - } - - std::ostringstream key_with_offset; - key_with_offset << key; - if (print_offsets) key_with_offset << "-" << offset; - clat_writer.Write(key_with_offset.str(), result.GetLatticeResult()); - if (!print_offsets) { - if (results.size() > 1) { - KALDI_WARN << "Utterance " << key - << (" has multiple segments but only one is written to" - " output. Use print_offsets=true"); - } - break; // Print only one result if offsets are not used. - } - } -} - -// Read lattice postprocessor config, apply it, and assign it to the pipeline. -inline void LoadAndSetLatticePostprocessor( - const std::string &config_filename, - BatchedThreadedNnet3CudaPipeline2 *cuda_pipeline) { - ParseOptions po(""); // No usage, reading from a file - LatticePostprocessorConfig pp_config; - pp_config.Register(&po); - po.ReadConfigFile(config_filename); - auto lattice_postprocessor = - std::make_shared(pp_config); - cuda_pipeline->SetLatticePostprocessor(lattice_postprocessor); -} - } // namespace cuda_decoder } // namespace kaldi