Skip to content

Commit

Permalink
Resolve "Long stream for streaming"
Browse files Browse the repository at this point in the history
  • Loading branch information
hugovbraun committed Jul 14, 2021
1 parent db74e2a commit d69750b
Show file tree
Hide file tree
Showing 11 changed files with 741 additions and 375 deletions.
2 changes: 1 addition & 1 deletion src/cudadecoder/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ OBJFILES = cuda-decoder.o cuda-decoder-kernels.o cuda-fst.o \
batched-threaded-nnet3-cuda-pipeline2.o \
batched-static-nnet3.o batched-static-nnet3-kernels.o \
cuda-online-pipeline-dynamic-batcher.o decodable-cumatrix.o \
lattice-postprocessor.o
cuda-pipeline-common.o lattice-postprocessor.o

LIBNAME = kaldi-cudadecoder

Expand Down
423 changes: 295 additions & 128 deletions src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.cc

Large diffs are not rendered by default.

112 changes: 86 additions & 26 deletions src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "base/kaldi-utils.h"
#include "cudadecoder/batched-static-nnet3.h"
#include "cudadecoder/cuda-decoder.h"
#include "cudadecoder/cuda-pipeline-common.h"
#include "cudadecoder/lattice-postprocessor.h"
#include "cudadecoder/thread-pool-light.h"
#include "cudafeat/online-batched-feature-pipeline-cuda.h"
#include "feat/wave-reader.h"
Expand Down Expand Up @@ -64,7 +66,8 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
num_worker_threads(-1),
determinize_lattice(true),
num_decoder_copy_threads(2),
use_gpu_feature_extraction(true) {}
use_gpu_feature_extraction(true),
reset_on_endpoint(false) {}
void Register(OptionsItf *po) {
po->Register("max-batch-size", &max_batch_size,
"The maximum execution batch size. "
Expand All @@ -85,6 +88,9 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
"the host to host copies.");
po->Register("gpu-feature-extract", &use_gpu_feature_extraction,
"Use GPU feature extraction");
po->Register(
"reset-on-endpoint", &reset_on_endpoint,
"Reset a decoder channel when endpoint detected. Do not close stream");

feature_opts.Register(po);
decoder_opts.Register(po);
Expand All @@ -97,6 +103,7 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
bool determinize_lattice;
int num_decoder_copy_threads;
bool use_gpu_feature_extraction;
bool reset_on_endpoint;

OnlineNnet2FeaturePipelineConfig feature_opts;
CudaDecoderConfig decoder_opts;
Expand Down Expand Up @@ -165,16 +172,17 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
void SetLatticeCallback(CorrelationID corr_id,
const LatticeCallback &callback);

// Chunk of one utterance. We receive batches of those chunks through
// DecodeBatch
// Contains pointers to that chunk, the corresponding correlation ID,
// and whether that chunk is the last one for that utterance
struct UtteranceChunk {
CorrelationID corr_id;
SubVector<BaseFloat> wave_samples;
bool last_chunk; // sets to true if last chunk for that
// utterance
};
// Set callback using SegmentedResultsCallback
// Able to run lattice postprocessor and generate CTM outputs
void SetLatticeCallback(CorrelationID corr_id,
const SegmentedResultsCallback &callback,
const int result_type);
// Lattice postprocessor
// Applied on both lattice output or CTM output
// Optional if lattice output is used
// Must be set if a result of type RESULT_TYPE_CTM is used
void SetLatticePostprocessor(
const std::shared_ptr<LatticePostprocessor> &lattice_postprocessor);

// Receive a batch of chunks. Will decode them, then return.
// If it contains some last chunks for given utterances, it will call
Expand Down Expand Up @@ -293,31 +301,76 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
const std::vector<bool> &is_last_chunk,
const std::vector<BaseFloat *> &d_ivectors);

void RunDecoder(const std::vector<int> &channels);
void RunDecoder(const std::vector<int> &channels,
const std::vector<bool> &is_first_chunk);

void InitDecoding(const std::vector<int> &channels,
const std::vector<bool> &is_first_chunk);

void RunCallbacksAndFinalize(const std::vector<CorrelationID> &corr_ids,
const std::vector<int> &channels,
const std::vector<bool> &is_last_chunk);

void RunBestPathCallbacks(const std::vector<CorrelationID> &corr_ids,
const std::vector<int> &channels,
const std::vector<bool> &is_last_chunk);

void RunLatticeCallbacks(const std::vector<CorrelationID> &corr_ids,
const std::vector<int> &channels,
const std::vector<bool> &is_last_chunk);

// Set d_features_ptrs_ and d_ivectors_ptrs_ using channels_
void SetFeaturesPtrs();

// If an utterance is done, we call FinalizeDecoding async on
// the threadpool
// it will call the utterance's callback when done
void FinalizeDecoding(int32 ichannel, const LatticeCallback *callback);
void FinalizeDecoding(int32 ichannel);

// static wrapper for thread pool
static void FinalizeDecodingWrapper(void *obj, uint64_t ichannel64,
void *callback_ptr) {
void *ignored) {
int32 ichannel = static_cast<int32>(ichannel64);
const LatticeCallback *callback =
static_cast<const LatticeCallback *>(callback_ptr);
static_cast<BatchedThreadedNnet3CudaOnlinePipeline *>(obj)
->FinalizeDecoding(ichannel, callback);
->FinalizeDecoding(ichannel);
}

//
// Internal structs
//

struct ChannelInfo {
int segmentid;
// Set when an endpoint was detected on the previous chunk
bool must_reset_decoder;
// We need to wait for the previous chunk ConcurrentGetRawLattice to finish
// before we can reset the decoder on this channel
std::atomic_bool can_reset_decoder;
BaseFloat segment_offset_seconds;

std::queue<std::unique_ptr<CallbackWithOptions>> queue;
std::mutex mutex;

void Reset() {
segmentid = 0;
must_reset_decoder = false;
can_reset_decoder.store(false);
segment_offset_seconds = 0;
// do not reset queue - a async task might still be executing
// this is fine, even if we mix different corr_ids in the same channel
// all relevant information is stored in CallbackWithOptions
}
};

//
// Data members
//

BatchedThreadedNnet3CudaOnlinePipelineConfig config_;

// Using unique_ptr to be able to allocate the vector directly with the right
// size We cannot move std::atomic
std::unique_ptr<std::vector<ChannelInfo>> channels_info_;
int32 max_batch_size_; // extracted from config_
// Models
const TransitionModel *trans_model_;
Expand All @@ -340,17 +393,24 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
std::vector<const std::string *> partial_hypotheses_buf_;
std::vector<bool> end_points_buf_;

// Used to know if a chunk is the end of a segment, but not necessarly end of
// stream
std::vector<bool> is_end_of_segment_;
// End of stream (end of last segment)
std::vector<bool> is_end_of_stream_;

// The callback is called once the final lattice is ready
std::unordered_map<CorrelationID, const LatticeCallback> lattice_callbacks_;
std::unordered_map<CorrelationID, const CallbackWithOptions>
lattice_callbacks_;

// Used for both final and partial best paths
std::unordered_map<CorrelationID, const BestPathCallback>
best_path_callbacks_;
// Lock for callbacks
std::mutex map_callbacks_m_;

// New channels in the current batch. We've just received
// their first batch
std::vector<int32> list_channels_first_chunk_;
// We'll call init decoding on those channels
std::vector<int32> init_decoding_list_channels_;

std::vector<int> n_samples_valid_, n_input_frames_valid_;

Expand All @@ -361,11 +421,8 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
// their last chunk
std::vector<int> list_channels_last_chunk_;
std::vector<CorrelationID> list_corr_id_last_chunk_;
std::vector<LatticeCallback *> list_lattice_callbacks_last_chunk_;

// Number of frames already computed in channel (before
// curr_batch_)
std::vector<int32> channel_frame_offset_;
std::vector<std::unique_ptr<CallbackWithOptions>>
list_lattice_callbacks_last_chunk_;

// Parameters extracted from the models
int input_frames_per_chunk_;
Expand Down Expand Up @@ -410,6 +467,9 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
// Only used if feature extraction is run on the CPU
std::vector<std::unique_ptr<OnlineNnet2FeaturePipeline>> feature_pipelines_;

// Use to postprocess lattices/generate CTM outputs
std::shared_ptr<LatticePostprocessor> lattice_postprocessor_;

// HCLG graph : CudaFst object is a host object, but contains
// data stored in
// GPU memory
Expand Down
23 changes: 4 additions & 19 deletions src/cudadecoder/batched-threaded-nnet3-cuda-pipeline2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ void BatchedThreadedNnet3CudaPipeline2::SegmentedDecodeWithCallback(
} else {
std::unique_ptr<SubVector<BaseFloat>> h_wave_segment(
new SubVector<BaseFloat>(h_wave.Data() + offset, nsamples));
BaseFloat offset_seconds = offset / model_freq_;
BaseFloat offset_seconds =
std::floor(static_cast<BaseFloat>(offset) / model_freq_);

// Saving this segment offset in result for later use
(*segmented_results)[isegment].SetTimeOffsetSeconds(offset_seconds);
Expand All @@ -304,25 +305,9 @@ void BatchedThreadedNnet3CudaPipeline2::SegmentedDecodeWithCallback(
// call the segmented callback with the vector of results
LatticeCallback callback = [=](CompactLattice &clat) {
CudaPipelineResult &result = (*segmented_results)[isegment];
if (result_type & CudaPipelineResult::RESULT_TYPE_LATTICE) {
if (lattice_postprocessor_) {
CompactLattice postprocessed_clat;
bool ok = lattice_postprocessor_->GetPostprocessedLattice(
clat, &postprocessed_clat);
if (ok) result.SetLatticeResult(std::move(postprocessed_clat));
} else {
result.SetLatticeResult(std::move(clat));
}
}

if (result_type & CudaPipelineResult::RESULT_TYPE_CTM) {
CTMResult ctm_result;
KALDI_ASSERT(lattice_postprocessor_ &&
"A lattice postprocessor must be set with "
"SetLatticePostprocessor() to use RESULT_TYPE_CTM");
bool ok = lattice_postprocessor_->GetCTM(clat, &ctm_result);
if (ok) result.SetCTMResult(std::move(ctm_result));
}
SetResultUsingLattice(clat, result_type, lattice_postprocessor_,
&result);

int n_not_done = n_segments_callbacks_not_done_->fetch_sub(1);
if (n_not_done == 1 && segmented_callback) {
Expand Down
146 changes: 146 additions & 0 deletions src/cudadecoder/cuda-pipeline-common.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// cudadecoder/cuda-pipeline-common.cc
//
// Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
// Hugo Braun
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <limits>
#include <random>
#if HAVE_CUDA == 1

#include "cudadecoder/cuda-pipeline-common.h"

#define KALDI_CUDA_DECODER_BIN_FLOAT_PRINT_PRECISION 2

namespace kaldi {
namespace cuda_decoder {

int NumberOfSegments(int nsamples, int seg_length, int seg_shift) {
KALDI_ASSERT(seg_shift > 0);
KALDI_ASSERT(seg_length >= seg_shift);
int r = seg_length - seg_shift;
if (nsamples <= seg_length) return 1;
int nsegments = ((nsamples - r) + seg_shift - 1) / seg_shift;
return nsegments;
}

void WriteLattices(std::vector<CudaPipelineResult> &results,
const std::string &key, bool print_offsets,
CompactLatticeWriter &clat_writer) {
for (CudaPipelineResult &result : results) {
double offset = result.GetTimeOffsetSeconds();
if (!result.HasValidResult()) {
KALDI_WARN << "Utterance " << key << ": "
<< " Segment with offset " << offset
<< " is not valid. Skipping";
}

std::ostringstream key_with_offset;
key_with_offset << key;
if (print_offsets) key_with_offset << "-" << offset;
clat_writer.Write(key_with_offset.str(), result.GetLatticeResult());
if (!print_offsets) {
if (results.size() > 1) {
KALDI_WARN << "Utterance " << key
<< " has multiple segments but only one is written to "
"output. Use print_offsets=true";
}
break; // printing only one result if offsets are not used
}
}
}

// Reads all CTM outputs in results and merge them together
// into a single output. That output is then written as a CTM text format to
// ostream
void MergeSegmentsToCTMOutput(std::vector<CudaPipelineResult> &results,
const std::string &key, std::ostream &ostream,
fst::SymbolTable *word_syms,
bool use_segment_offsets) {
size_t nresults = results.size();

if (nresults == 0) {
KALDI_WARN << "Utterance " << key << " has no results. Skipping";
return;
}

bool all_results_valid = true;

for (size_t iresult = 0; iresult < nresults; ++iresult)
all_results_valid &= results[iresult].HasValidResult();

if (!all_results_valid) {
KALDI_WARN << "Utterance " << key
<< " has at least one segment with an error. Skipping";
return;
}

ostream << std::fixed;
ostream.precision(KALDI_CUDA_DECODER_BIN_FLOAT_PRINT_PRECISION);

// opt: combine results into one here
BaseFloat previous_segment_word_end = 0;
for (size_t iresult = 0; iresult < nresults; ++iresult) {
bool this_segment_first_word = true;
bool is_last_segment = ((iresult + 1) == nresults);
BaseFloat next_offset_seconds = std::numeric_limits<BaseFloat>::max();
if (!is_last_segment) {
next_offset_seconds = results[iresult + 1].GetTimeOffsetSeconds();
}

auto &result = results[iresult];
BaseFloat offset_seconds =
use_segment_offsets ? result.GetTimeOffsetSeconds() : 0;
int isegment = result.GetSegmentID();
auto &ctm = result.GetCTMResult();
for (size_t iword = 0; iword < ctm.times_seconds.size(); ++iword) {
BaseFloat word_from = offset_seconds + ctm.times_seconds[iword].first;
BaseFloat word_to = offset_seconds + ctm.times_seconds[iword].second;

// If beginning of this segment, only keep "new" words
// i.e. the ones that were not already in previous segment
if (this_segment_first_word) {
if (word_from >= previous_segment_word_end) {
// Found the first "new" word for this segment
this_segment_first_word = false;
} else
continue; // skipping this word
}

// If end of this segment, skip the words which are
// overlapping two segments
if (!is_last_segment) {
if (word_from >= next_offset_seconds) break; // done with this segment
}

previous_segment_word_end = word_to;

ostream << key << " " << isegment << " " << word_from << ' '
<< (word_to - word_from) << ' ';

int32 word_id = ctm.words[iword];
if (word_syms)
ostream << word_syms->Find(word_id);
else
ostream << word_id;

ostream << ' ' << ctm.conf[iword] << '\n';
}
}
}

} // namespace cuda_decoder
} // namespace kaldi

#endif
Loading

0 comments on commit d69750b

Please sign in to comment.