Skip to content

Commit

Permalink
refactor: make indexing graph more readable and add traces
Browse files Browse the repository at this point in the history
  • Loading branch information
variar committed Sep 7, 2021
1 parent 07fc58b commit 72cfdc8
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 83 deletions.
23 changes: 23 additions & 0 deletions src/logdata/include/logdataworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
#include <variant>

#include <QObject>
#include <QFile>
#include <QTextCodec>

#include <tbb/flow_graph.h>
#include <tbb/task_group.h>

#include "atomicflag.h"
Expand Down Expand Up @@ -147,6 +149,16 @@ class IndexingDataAccessor {
data_->hash_.tailDigest = digest;
}

int getProgress() const
{
return data_->getProgress();
}

void setProgress( int progress )
{
data_->setProgress( progress );
}

// Completely clear the indexing data.
void clear()
{
Expand Down Expand Up @@ -201,12 +213,17 @@ class IndexingData {

size_t allocatedSize() const;

int getProgress() const;
void setProgress( int progress );

private:
mutable SharedMutex dataMutex_;

LinePositionArray linePosition_;
LineLength maxLength_;

int progress_{};

FileDigest hashBuilder_;
IndexedHash hash_;

Expand Down Expand Up @@ -255,6 +272,9 @@ class IndexOperation : public QObject {
void fileCheckFinished( MonitoredFileStatus );

protected:
using BlockData = std::pair<LineOffset::UnderlyingType, QByteArray>;
using BlockReader = tbb::flow::async_node<tbb::flow::continue_msg, BlockData>;

// Returns the total size indexed
// Modify the passed linePosition and maxLength
void doIndex( LineOffset initialPosition );
Expand All @@ -269,6 +289,9 @@ class IndexOperation : public QObject {

void guessEncoding( const QByteArray& block, IndexingData::MutateAccessor& scopedAccessor,
IndexingState& state ) const;

std::chrono::microseconds readFileInBlocks( QFile& file, BlockReader::gateway_type& gw );
void indexNextBlock( IndexingState& state, const BlockData& blockData );
};

class FullIndexOperation : public IndexOperation {
Expand Down
2 changes: 1 addition & 1 deletion src/logdata/src/logdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,12 +552,12 @@ std::vector<std::string_view> LogData::RawLines::buildUtf8View() const
lines.push_back( wholeString.substr( 0, nextLineFeed ) );
wholeString.remove_prefix( nextLineFeed + 1 );
nextLineFeed = wholeString.find( '\n' );
;
}

if ( !wholeString.empty() ) {
lines.push_back( wholeString );
}

} catch ( const std::exception& e ) {
LOG_ERROR << "failed to transform lines to utf8 " << e.what();
const auto lastLineOffset = utf8Data_.size();
Expand Down
196 changes: 114 additions & 82 deletions src/logdata/src/logdataworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@
#include <cmath>
#include <exception>
#include <functional>
#include <oneapi/tbb/flow_graph.h>
#include <string_view>
#include <thread>

#include <QFile>
#include <QFileInfo>
#include <QMessageBox>

#include <tbb/flow_graph.h>

#include "configuration.h"
#include "dispatch_to.h"
#include "encodingdetector.h"
Expand Down Expand Up @@ -127,6 +126,16 @@ void IndexingData::addAll( const QByteArray& block, LineLength length,
encodingGuess_ = encoding;
}

int IndexingData::getProgress() const
{
return progress_;
}

void IndexingData::setProgress( int progress )
{
progress_ = progress;
}

void IndexingData::clear()
{
maxLength_ = 0_length;
Expand All @@ -136,6 +145,8 @@ void IndexingData::clear()
encodingGuess_ = nullptr;
encodingForced_ = nullptr;

progress_ = {};

const auto& config = Configuration::get();
useFastModificationDetection_ = config.fastModificationDetection();
}
Expand Down Expand Up @@ -407,7 +418,7 @@ FastLinePositionArray IndexOperation::parseDataBlock( LineOffset::UnderlyingType
}

const auto currentDataEnd = posWithinBlock + blockBeginning;

const auto length = ( currentDataEnd - state.pos ) / state.encodingParams.lineFeedWidth
+ state.additional_spaces;

Expand Down Expand Up @@ -451,6 +462,100 @@ void IndexOperation::guessEncoding( const QByteArray& block,
<< state.encodingParams.lineFeedWidth;
}

std::chrono::microseconds IndexOperation::readFileInBlocks( QFile& file,
BlockReader::gateway_type& gw )
{
using namespace std::chrono;
using clock = high_resolution_clock;

LOG_INFO << "Starting IO thread";

microseconds ioDuration{};
while ( !file.atEnd() ) {

if ( interruptRequest_ ) {
break;
}

BlockData blockData{ file.pos(), QByteArray{ IndexingBlockSize, Qt::Uninitialized } };

clock::time_point ioT1 = clock::now();
const auto readBytes
= static_cast<int>( file.read( blockData.second.data(), blockData.second.size() ) );

if ( readBytes < 0 ) {
LOG_ERROR << "Reading past the end of file";
break;
}

if ( readBytes < blockData.second.size() ) {
blockData.second.resize( readBytes );
}

clock::time_point ioT2 = clock::now();

ioDuration += duration_cast<microseconds>( ioT2 - ioT1 );

LOG_DEBUG << "Sending block " << blockData.first << " size " << blockData.second.size();

while ( !gw.try_put( blockData ) ) {
std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );
}
}

auto lastBlock = std::make_pair( -1, QByteArray{} );
while ( !gw.try_put( lastBlock ) ) {
std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );
}

LOG_INFO << "IO thread done";
return ioDuration;
}

void IndexOperation::indexNextBlock( IndexingState& state, const BlockData& blockData )
{
const auto& blockBeginning = blockData.first;
const auto& block = blockData.second;

LOG_DEBUG << "Indexing block " << blockBeginning << " start";

if ( blockBeginning < 0 ) {
return;
}

IndexingData::MutateAccessor scopedAccessor{ indexing_data_.get() };

guessEncoding( block, scopedAccessor, state );

if ( !block.isEmpty() ) {
const auto linePositions = parseDataBlock( blockBeginning, block, state );
auto maxLength = state.max_length;
if ( maxLength > std::numeric_limits<LineLength::UnderlyingType>::max() ) {
LOG_ERROR << "Too long lines " << maxLength;
maxLength = std::numeric_limits<LineLength::UnderlyingType>::max();
}

scopedAccessor.addAll( block,
LineLength( static_cast<LineLength::UnderlyingType>( maxLength ) ),
linePositions, state.encodingGuess );

// Update the caller for progress indication
const auto progress
= ( state.file_size > 0 ) ? calculateProgress( state.pos, state.file_size ) : 100;

if ( progress != scopedAccessor.getProgress() ) {
scopedAccessor.setProgress( progress );
LOG_INFO << "Indexing progress " << progress << ", indexed size " << state.pos;
emit indexingProgressed( progress );
}
}
else {
scopedAccessor.setEncodingGuess( state.encodingGuess );
}

LOG_DEBUG << "Indexing block " << blockBeginning << " done";
}

void IndexOperation::doIndex( LineOffset initialPosition )
{
QFile file( fileName_ );
Expand All @@ -465,6 +570,7 @@ void IndexOperation::doIndex( LineOffset initialPosition )
scopedAccessor.clear();
scopedAccessor.setEncodingGuess( QTextCodec::codecForLocale() );

scopedAccessor.setProgress( 100 );
emit indexingProgressed( 100 );
return;
}
Expand All @@ -487,61 +593,23 @@ void IndexOperation::doIndex( LineOffset initialPosition )
const auto& config = Configuration::get();
const auto prefetchBufferSize = static_cast<size_t>( config.indexReadBufferSizeMb() );

LOG_INFO << "Prefetch buffer " << readableSize( prefetchBufferSize * IndexingBlockSize );

using namespace std::chrono;
using clock = high_resolution_clock;
microseconds ioDuration{};

const auto indexingStartTime = clock::now();

tbb::flow::graph indexingGraph;
using BlockData = std::pair<LineOffset::UnderlyingType, QByteArray>;

std::thread ioThread;
auto blockReaderAsync = tbb::flow::async_node<tbb::flow::continue_msg, BlockData>(
auto blockReaderAsync = BlockReader(
indexingGraph, tbb::flow::serial,
[ this, &ioThread, &file, &ioDuration ]( const auto&, auto& gateway ) {
gateway.reserve_wait();

ioThread = std::thread( [ this, &file, &ioDuration, gw = std::ref( gateway ) ] {
while ( !file.atEnd() ) {

if ( interruptRequest_ ) {
break;
}

BlockData blockData{ file.pos(),
QByteArray{ IndexingBlockSize, Qt::Uninitialized } };

clock::time_point ioT1 = clock::now();
const auto readBytes = static_cast<int>(
file.read( blockData.second.data(), blockData.second.size() ) );

if ( readBytes < 0 ) {
LOG_ERROR << "Reading past the end of file";
break;
}

if ( readBytes < blockData.second.size() ) {
blockData.second.resize( readBytes );
}

clock::time_point ioT2 = clock::now();

ioDuration += duration_cast<microseconds>( ioT2 - ioT1 );

LOG_DEBUG << "Sending block " << blockData.first << " size "
<< blockData.second.size();

while ( !gw.get().try_put( blockData ) ) {
std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );
}
}

auto lastBlock = std::make_pair( -1, QByteArray{} );
while ( !gw.get().try_put( lastBlock ) ) {
std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );
}

ioDuration = readFileInBlocks( file, gw.get() );
gw.get().release_wait();
} );
} );
Expand All @@ -551,43 +619,7 @@ void IndexOperation::doIndex( LineOffset initialPosition )

auto blockParser = tbb::flow::function_node<BlockData, tbb::flow::continue_msg>(
indexingGraph, 1, [ this, &state ]( const BlockData& blockData ) {
const auto& block_beginning = blockData.first;
const auto& block = blockData.second;

LOG_DEBUG << "Indexing block " << block_beginning << " start";

if ( block_beginning < 0 ) {
return tbb::flow::continue_msg{};
}

IndexingData::MutateAccessor scopedAccessor{ indexing_data_.get() };

guessEncoding( block, scopedAccessor, state );

if ( !block.isEmpty() ) {
const auto line_positions = parseDataBlock( block_beginning, block, state );
auto max_length = state.max_length;
if ( max_length > std::numeric_limits<LineLength::UnderlyingType>::max() ) {
LOG_ERROR << "Too long lines " << max_length;
max_length = std::numeric_limits<LineLength::UnderlyingType>::max();
}

scopedAccessor.addAll(
block, LineLength( static_cast<LineLength::UnderlyingType>( max_length ) ),
line_positions, state.encodingGuess );

// Update the caller for progress indication
const auto progress = ( state.file_size > 0 )
? calculateProgress( state.pos, state.file_size )
: 100;
emit indexingProgressed( progress );
}
else {
scopedAccessor.setEncodingGuess( state.encodingGuess );
}

LOG_DEBUG << "Indexing block " << block_beginning << " done";

indexNextBlock( state, blockData );
return tbb::flow::continue_msg{};
} );

Expand Down

0 comments on commit 72cfdc8

Please sign in to comment.