Skip to content

Commit

Permalink
Fix race when file changes during being indexed
Browse files Browse the repository at this point in the history
  • Loading branch information
variar committed Apr 2, 2021
1 parent 88f06e7 commit 2d1d5c4
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 65 deletions.
20 changes: 12 additions & 8 deletions src/logdata/include/data/logdataworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

#include <QCryptographicHash>
#include <QFuture>
#include <QFutureWatcher>
#include <QMutex>
#include <QObject>
#include <QSemaphore>
Expand Down Expand Up @@ -238,12 +237,14 @@ class IndexOperation : public QObject {
{
}

// Start the indexing operation, returns true if it has been done
// Run the indexing operation, returns true if it has been done
// and false if it has been cancelled (results not copied)
virtual OperationResult start() = 0;
virtual OperationResult run() = 0;

signals:
void indexingProgressed( int );
void indexingFinished( bool );
void fileCheckFinished( MonitoredFileStatus );

protected:
// Returns the total size indexed
Expand All @@ -270,7 +271,7 @@ class FullIndexOperation : public IndexOperation {
, forcedEncoding_( forcedEncoding )
{
}
OperationResult start() override;
OperationResult run() override;

private:
QTextCodec* forcedEncoding_;
Expand All @@ -285,7 +286,7 @@ class PartialIndexOperation : public IndexOperation {
{
}

OperationResult start() override;
OperationResult run() override;
};

class CheckFileChangesOperation : public IndexOperation {
Expand All @@ -297,7 +298,10 @@ class CheckFileChangesOperation : public IndexOperation {
{
}

OperationResult start() override;
OperationResult run() override;

private:
MonitoredFileStatus doCheckFileChanges();
};

class LogDataWorker : public QObject {
Expand Down Expand Up @@ -337,13 +341,13 @@ class LogDataWorker : public QObject {
void checkFileChangesFinished( MonitoredFileStatus status );

private slots:
void onOperationFinished();
void onIndexingFinished( bool result );
void onCheckFileFinished( MonitoredFileStatus result );

private:
OperationResult connectSignalsAndRun( IndexOperation* operationRequested );

QFuture<OperationResult> operationFuture_;
QFutureWatcher<OperationResult> operationWatcher_;

AtomicFlag interruptRequest_;

Expand Down
12 changes: 6 additions & 6 deletions src/logdata/include/data/logfiltereddataworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include <QRegularExpression>

#include <QFuture>
#include <QFutureWatcher>

#include <immer/flex_vector.hpp>

Expand Down Expand Up @@ -134,12 +133,13 @@ class SearchOperation : public QObject {
SearchOperation( const LogData& sourceLogData, AtomicFlag& interruptRequested,
const QRegularExpression& regExp, LineNumber startLine, LineNumber endLine );

// Start the search operation, returns true if it has been done
// Run the search operation, returns true if it has been done
// and false if it has been cancelled (results not copied)
virtual void start( SearchData& result ) = 0;
virtual void run( SearchData& result ) = 0;

signals:
void searchProgressed( LinesCount nbMatches, int percent, LineNumber initialLine );
void searchFinished();

protected:
// Implement the common part of the search, passing
Expand All @@ -163,7 +163,8 @@ class FullSearchOperation : public SearchOperation {
{
}

void start( SearchData& result ) override;
void run( SearchData& result ) override;

};

class UpdateSearchOperation : public SearchOperation {
Expand All @@ -177,7 +178,7 @@ class UpdateSearchOperation : public SearchOperation {
{
}

void start( SearchData& result ) override;
void run( SearchData& result ) override;

private:
LineNumber initialPosition_;
Expand Down Expand Up @@ -221,7 +222,6 @@ class LogFilteredDataWorker : public QObject {
// Mutex to protect operationRequested_ and friends
Lock mutex_;
QFuture<void> operationFuture_;
QFutureWatcher<void> operationWatcher_;

// Shared indexing data
SearchData searchData_;
Expand Down
91 changes: 52 additions & 39 deletions src/logdata/src/logdataworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ size_t IndexingData::allocatedSize() const
LogDataWorker::LogDataWorker( IndexingData& indexing_data )
: indexing_data_( indexing_data )
{
connect( &operationWatcher_, &QFutureWatcher<OperationResult>::finished, this,
&LogDataWorker::onOperationFinished, Qt::QueuedConnection );
}

LogDataWorker::~LogDataWorker()
Expand All @@ -222,59 +220,63 @@ void LogDataWorker::indexAll( QTextCodec* forcedEncoding )
ScopedLock locker( &mutex_ );
LOG( logDEBUG ) << "FullIndex requested";

operationWatcher_.waitForFinished();
operationFuture_.waitForFinished();
interruptRequest_.clear();

operationFuture_ = QtConcurrent::run( [this, forcedEncoding, fileName = fileName_] {
auto operationRequested = std::make_unique<FullIndexOperation>(
fileName, indexing_data_, interruptRequest_, forcedEncoding );
return connectSignalsAndRun( operationRequested.get() );
} );

operationWatcher_.setFuture( operationFuture_ );
}

void LogDataWorker::indexAdditionalLines()
{
ScopedLock locker( &mutex_ );
LOG( logDEBUG ) << "AddLines requested";

operationWatcher_.waitForFinished();
operationFuture_.waitForFinished();
interruptRequest_.clear();

operationFuture_ = QtConcurrent::run( [this, fileName = fileName_] {
auto operationRequested = std::make_unique<PartialIndexOperation>( fileName, indexing_data_,
interruptRequest_ );
return connectSignalsAndRun( operationRequested.get() );
} );

operationWatcher_.setFuture( operationFuture_ );
}

void LogDataWorker::checkFileChanges()
{
ScopedLock locker( &mutex_ );
LOG( logDEBUG ) << "Check file changes requested";

operationWatcher_.waitForFinished();
operationFuture_.waitForFinished();
interruptRequest_.clear();

operationFuture_ = QtConcurrent::run( [this, fileName = fileName_] {
auto operationRequested = std::make_unique<CheckFileChangesOperation>(
fileName, indexing_data_, interruptRequest_ );

return operationRequested->start();
return connectSignalsAndRun( operationRequested.get() );
} );

operationWatcher_.setFuture( operationFuture_ );
}

OperationResult LogDataWorker::connectSignalsAndRun( IndexOperation* operationRequested )
{
connect( operationRequested, &IndexOperation::indexingProgressed, this,
&LogDataWorker::indexingProgressed );

return operationRequested->start();
connect( operationRequested, &IndexOperation::indexingFinished, this,
&LogDataWorker::onIndexingFinished, Qt::QueuedConnection );

connect( operationRequested, &IndexOperation::fileCheckFinished, this,
&LogDataWorker::onCheckFileFinished, Qt::QueuedConnection );

auto result = operationRequested->run();

operationRequested->disconnect( this );

return result;
}

void LogDataWorker::interrupt()
Expand All @@ -283,25 +285,22 @@ void LogDataWorker::interrupt()
interruptRequest_.set();
}

void LogDataWorker::onOperationFinished()
void LogDataWorker::onIndexingFinished( bool result )
{
const auto variantResult = operationWatcher_.result();
absl::visit( make_visitor(
[this]( bool result ) {
if ( result ) {
LOG( logDEBUG ) << "... finished copy in workerThread.";
emit indexingFinished( LoadingStatus::Successful );
}
else {
LOG( logINFO ) << "indexing interrupted";
emit indexingFinished( LoadingStatus::Interrupted );
}
},
[this]( MonitoredFileStatus result ) {
LOG( logINFO ) << "checking file finished";
emit checkFileChangesFinished( result );
} ),
variantResult );
if ( result ) {
LOG( logINFO ) << "finished indexing in worker thread";
emit indexingFinished( LoadingStatus::Successful );
}
else {
LOG( logINFO ) << "indexing interrupted in worker thread";
emit indexingFinished( LoadingStatus::Interrupted );
}
}

void LogDataWorker::onCheckFileFinished( const MonitoredFileStatus result )
{
LOG( logINFO ) << "checking file finished in worker thread";
emit checkFileChangesFinished( result );
}

//
Expand Down Expand Up @@ -588,15 +587,19 @@ void IndexOperation::doIndex( LineOffset initialPosition )
/ ( 1024 * 1024 )
<< " MiB/s";

if ( interruptRequest_ ) {
scopedAccessor.clear();
}

if ( !scopedAccessor.getEncodingGuess() ) {
scopedAccessor.setEncodingGuess( QTextCodec::codecForLocale() );
}
}

// Called in the worker thread's context
OperationResult FullIndexOperation::start()
OperationResult FullIndexOperation::run()
{
LOG( logDEBUG ) << "FullIndexOperation::start(), file " << fileName_.toStdString();
LOG( logDEBUG ) << "FullIndexOperation::run(), file " << fileName_.toStdString();

LOG( logDEBUG ) << "FullIndexOperation: Starting the count...";

Expand All @@ -614,12 +617,14 @@ OperationResult FullIndexOperation::start()
"interrupt = "
<< static_cast<bool>( interruptRequest_ );

return ( interruptRequest_ ? false : true );
const auto result = interruptRequest_ ? false : true;
emit indexingFinished( result );
return result;
}

OperationResult PartialIndexOperation::start()
OperationResult PartialIndexOperation::run()
{
LOG( logDEBUG ) << "PartialIndexOperation::start(), file " << fileName_.toStdString();
LOG( logDEBUG ) << "PartialIndexOperation::run(), file " << fileName_.toStdString();

const auto initial_position
= LineOffset( IndexingData::ConstAccessor{ &indexing_data_ }.getIndexedSize() );
Expand All @@ -633,13 +638,21 @@ OperationResult PartialIndexOperation::start()

LOG( logDEBUG ) << "PartialIndexOperation: ... finished counting.";

return ( interruptRequest_ ? false : true );
const auto result = interruptRequest_ ? false : true;
emit indexingFinished( result );
return result;
}

OperationResult CheckFileChangesOperation::start()
OperationResult CheckFileChangesOperation::run()
{
LOG( logINFO ) << "CheckFileChangesOperation::start(), file " << fileName_.toStdString();
LOG( logINFO ) << "CheckFileChangesOperation::run(), file " << fileName_.toStdString();
const auto result = doCheckFileChanges();
emit fileCheckFinished( result );
return result;
}

MonitoredFileStatus CheckFileChangesOperation::doCheckFileChanges()
{
QFileInfo info( fileName_ );
const auto indexedHash = IndexingData::ConstAccessor{ &indexing_data_ }.getHash();
const auto realFileSize = info.size();
Expand Down
22 changes: 10 additions & 12 deletions src/logdata/src/logfiltereddataworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,24 @@ LogFilteredDataWorker::LogFilteredDataWorker( const LogData& sourceLogData )
, mutex_()
, searchData_()
{
connect( &operationWatcher_, &QFutureWatcher<void>::finished, this,
&LogFilteredDataWorker::searchFinished );
}

LogFilteredDataWorker::~LogFilteredDataWorker()
{
interruptRequested_.set();
ScopedLock locker( &mutex_ );
operationWatcher_.waitForFinished();
operationFuture_.waitForFinished();
}

void LogFilteredDataWorker::connectSignalsAndRun( SearchOperation* operationRequested )
{
connect( operationRequested, &SearchOperation::searchProgressed, this,
&LogFilteredDataWorker::searchProgressed );
connect( operationRequested, &SearchOperation::searchFinished, this,
&LogFilteredDataWorker::searchFinished, Qt::QueuedConnection );

operationRequested->start( searchData_ );
operationRequested->run( searchData_ );
operationRequested->disconnect(this);
}

void LogFilteredDataWorker::search( const QRegularExpression& regExp, LineNumber startLine,
Expand All @@ -249,16 +250,14 @@ void LogFilteredDataWorker::search( const QRegularExpression& regExp, LineNumber

LOG( logINFO ) << "Search requested";

operationWatcher_.waitForFinished();
operationFuture_.waitForFinished();
interruptRequested_.clear();

operationFuture_ = QtConcurrent::run( [this, regExp, startLine, endLine] {
auto operationRequested = std::make_unique<FullSearchOperation>(
sourceLogData_, interruptRequested_, regExp, startLine, endLine );
connectSignalsAndRun( operationRequested.get() );
} );

operationWatcher_.setFuture( operationFuture_ );
}

void LogFilteredDataWorker::updateSearch( const QRegularExpression& regExp, LineNumber startLine,
Expand All @@ -268,16 +267,14 @@ void LogFilteredDataWorker::updateSearch( const QRegularExpression& regExp, Line

LOG( logINFO ) << "Search update requested from " << position.get();

operationWatcher_.waitForFinished();
operationFuture_.waitForFinished();
interruptRequested_.clear();

operationFuture_ = QtConcurrent::run( [this, regExp, startLine, endLine, position] {
auto operationRequested = std::make_unique<UpdateSearchOperation>(
sourceLogData_, interruptRequested_, regExp, startLine, endLine, position );
connectSignalsAndRun( operationRequested.get() );
} );

operationWatcher_.setFuture( operationFuture_ );
}

void LogFilteredDataWorker::interrupt()
Expand Down Expand Up @@ -513,10 +510,11 @@ void SearchOperation::doSearch( SearchData& searchData, LineNumber initialLine )
<< " lines/s";

emit searchProgressed( nbMatches, 100, initialLine );
emit searchFinished();
}

// Called in the worker thread's context
void FullSearchOperation::start( SearchData& searchData )
void FullSearchOperation::run( SearchData& searchData )
{
// Clear the shared data
searchData.clear();
Expand All @@ -525,7 +523,7 @@ void FullSearchOperation::start( SearchData& searchData )
}

// Called in the worker thread's context
void UpdateSearchOperation::start( SearchData& searchData )
void UpdateSearchOperation::run( SearchData& searchData )
{
auto initial_line = qMax( searchData.getLastProcessedLine(), initialPosition_ );

Expand Down

0 comments on commit 2d1d5c4

Please sign in to comment.