/* * Copyright (C) 2009, 2010, 2014 Nicolas Bonnefon and other contributors * * This file is part of glogg. * * glogg is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * glogg is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with glogg. If not, see . */ #include #include "log.h" #include "logdata.h" #include "logdataworkerthread.h" // Size of the chunk to read (5 MiB) const int IndexOperation::sizeChunk = 5*1024*1024; void IndexingData::getAll( qint64* size, int* length, LinePositionArray* linePosition ) { QMutexLocker locker( &dataMutex_ ); *size = indexedSize_; *length = maxLength_; *linePosition = std::move( linePosition_ ); } void IndexingData::setAll( qint64 size, int length, LinePositionArray& linePosition ) { QMutexLocker locker( &dataMutex_ ); indexedSize_ = size; maxLength_ = length; linePosition_ = std::move( linePosition ); } void IndexingData::addAll( qint64 size, int length, const LinePositionArray& linePosition ) { QMutexLocker locker( &dataMutex_ ); indexedSize_ += size; maxLength_ = qMax( maxLength_, length ); linePosition_.append_list( linePosition ); } LogDataWorkerThread::LogDataWorkerThread() : QThread(), mutex_(), operationRequestedCond_(), nothingToDoCond_(), fileName_(), indexingData_() { terminate_ = false; interruptRequested_ = false; operationRequested_ = NULL; } LogDataWorkerThread::~LogDataWorkerThread() { { QMutexLocker locker( &mutex_ ); terminate_ = true; operationRequestedCond_.wakeAll(); } wait(); } void LogDataWorkerThread::attachFile( const QString& fileName ) { QMutexLocker locker( &mutex_ ); // to protect fileName_ fileName_ = fileName; } void LogDataWorkerThread::indexAll() { QMutexLocker locker( &mutex_ ); // to protect operationRequested_ LOG(logDEBUG) << "FullIndex requested"; // If an operation is ongoing, we will block while ( (operationRequested_ != NULL) ) nothingToDoCond_.wait( &mutex_ ); interruptRequested_ = false; operationRequested_ = new FullIndexOperation( fileName_, &interruptRequested_ ); operationRequestedCond_.wakeAll(); } void LogDataWorkerThread::indexAdditionalLines( qint64 position ) { QMutexLocker locker( &mutex_ ); // to protect operationRequested_ LOG(logDEBUG) << "AddLines requested"; // If an operation is ongoing, we will block while ( (operationRequested_ != NULL) ) nothingToDoCond_.wait( &mutex_ ); interruptRequested_ = false; operationRequested_ = new PartialIndexOperation( fileName_, &interruptRequested_, position ); operationRequestedCond_.wakeAll(); } void LogDataWorkerThread::interrupt() { LOG(logDEBUG) << "Load interrupt requested"; // No mutex here, setting a bool is probably atomic! interruptRequested_ = true; } // This will do an atomic copy of the object // (hopefully fast as we use Qt containers) void LogDataWorkerThread::getIndexingData( qint64* indexedSize, int* maxLength, LinePositionArray* linePosition ) { indexingData_.getAll( indexedSize, maxLength, linePosition ); } // This is the thread's main loop void LogDataWorkerThread::run() { QMutexLocker locker( &mutex_ ); forever { while ( (terminate_ == false) && (operationRequested_ == NULL) ) operationRequestedCond_.wait( &mutex_ ); LOG(logDEBUG) << "Worker thread signaled"; // Look at what needs to be done if ( terminate_ ) return; // We must die if ( operationRequested_ ) { connect( operationRequested_, SIGNAL( indexingProgressed( int ) ), this, SIGNAL( indexingProgressed( int ) ) ); // Run the operation try { if ( operationRequested_->start( indexingData_ ) ) { LOG(logDEBUG) << "... finished copy in workerThread."; emit indexingFinished( LoadingStatus::Successful ); } else { emit indexingFinished( LoadingStatus::Interrupted ); } } catch ( std::bad_alloc& ba ) { LOG(logERROR) << "Out of memory whilst indexing!"; emit indexingFinished( LoadingStatus::NoMemory ); } delete operationRequested_; operationRequested_ = NULL; nothingToDoCond_.wakeAll(); } } } // // Operations implementation // IndexOperation::IndexOperation( QString& fileName, bool* interruptRequest ) : fileName_( fileName ) { interruptRequest_ = interruptRequest; } PartialIndexOperation::PartialIndexOperation( QString& fileName, bool* interruptRequest, qint64 position ) : IndexOperation( fileName, interruptRequest ) { initialPosition_ = position; } qint64 IndexOperation::doIndex( LinePositionArray& linePosition, int* maxLength, qint64 initialPosition ) { int max_length = *maxLength; qint64 pos = initialPosition; // Absolute position of the start of current line qint64 end = 0; // Absolute position of the end of current line int additional_spaces = 0; // Additional spaces due to tabs QFile file( fileName_ ); if ( file.open( QIODevice::ReadOnly ) ) { // Count the number of lines and max length // (read big chunks to speed up reading from disk) file.seek( pos ); while ( !file.atEnd() ) { if ( *interruptRequest_ ) // a bool is always read/written atomically isn't it? break; // Read a chunk of 5MB const qint64 block_beginning = file.pos(); const QByteArray block = file.read( sizeChunk ); // Count the number of lines in each chunk qint64 pos_within_block = 0; while ( pos_within_block != -1 ) { pos_within_block = qMax( pos - block_beginning, 0LL); // Looking for the next \n, expanding tabs in the process do { if ( pos_within_block < block.length() ) { const char c = block.at(pos_within_block); if ( c == '\n' ) break; else if ( c == '\t' ) additional_spaces += AbstractLogData::tabStop - ( ( ( block_beginning - pos ) + pos_within_block + additional_spaces ) % AbstractLogData::tabStop ) - 1; pos_within_block++; } else { pos_within_block = -1; } } while ( pos_within_block != -1 ); // When a end of line has been found... if ( pos_within_block != -1 ) { end = pos_within_block + block_beginning; const int length = end-pos + additional_spaces; if ( length > max_length ) max_length = length; pos = end + 1; additional_spaces = 0; linePosition.append( pos ); } } // Update the caller for progress indication int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100; emit indexingProgressed( progress ); } // Check if there is a non LF terminated line at the end of the file if ( file.size() > pos ) { LOG( logWARNING ) << "Non LF terminated file, adding a fake end of line"; linePosition.append( file.size() + 1 ); linePosition.setFakeFinalLF(); } } else { // TODO: Check that the file is seekable? // If the file cannot be open, we do as if it was empty LOG(logWARNING) << "Cannot open file " << fileName_.toStdString(); emit indexingProgressed( 100 ); } *maxLength = max_length; return file.size(); } // Called in the worker thread's context // Should not use any shared variable bool FullIndexOperation::start( IndexingData& sharedData ) { LOG(logDEBUG) << "FullIndexOperation::start(), file " << fileName_.toStdString(); LOG(logDEBUG) << "FullIndexOperation: Starting the count..."; int maxLength = 0; LinePositionArray linePosition = {}; emit indexingProgressed( 0 ); qint64 size = doIndex( linePosition, &maxLength, 0 ); if ( *interruptRequest_ == false ) { // Commit the results to the shared data (atomically) sharedData.setAll( size, maxLength, linePosition ); } LOG(logDEBUG) << "FullIndexOperation: ... finished counting." "interrupt = " << *interruptRequest_; return ( *interruptRequest_ ? false : true ); } bool PartialIndexOperation::start( IndexingData& sharedData ) { LOG(logDEBUG) << "PartialIndexOperation::start(), file " << fileName_.toStdString(); LOG(logDEBUG) << "PartialIndexOperation: Starting the count at " << initialPosition_ << " ..."; int maxLength = 0; LinePositionArray linePosition = {}; emit indexingProgressed( 0 ); qint64 size = doIndex( linePosition, &maxLength, initialPosition_ ); if ( *interruptRequest_ == false ) { // Commit the results to the shared data (atomically) sharedData.addAll( size - initialPosition_, maxLength, linePosition ); } LOG(logDEBUG) << "PartialIndexOperation: ... finished counting."; return ( *interruptRequest_ ? false : true ); }