1 /* 2 * Copyright (C) 2009, 2010, 2014 Nicolas Bonnefon and other contributors 3 * 4 * This file is part of glogg. 5 * 6 * glogg is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * glogg is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with glogg. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include <QFile> 21 22 #include "log.h" 23 24 #include "logdata.h" 25 #include "logdataworkerthread.h" 26 27 // Size of the chunk to read (5 MiB) 28 const int IndexOperation::sizeChunk = 5*1024*1024; 29 30 void IndexingData::getAll( qint64* size, int* length, 31 LinePositionArray* linePosition ) 32 { 33 QMutexLocker locker( &dataMutex_ ); 34 35 *size = indexedSize_; 36 *length = maxLength_; 37 // FIXME: slooooooooow 38 *linePosition = linePosition_; 39 } 40 41 void IndexingData::setAll( qint64 size, int length, 42 LinePositionArray& linePosition ) 43 { 44 QMutexLocker locker( &dataMutex_ ); 45 46 indexedSize_ = size; 47 maxLength_ = length; 48 linePosition_ = std::move( linePosition ); 49 } 50 51 void IndexingData::addAll( qint64 size, int length, 52 const LinePositionArray& linePosition ) 53 { 54 QMutexLocker locker( &dataMutex_ ); 55 56 indexedSize_ += size; 57 maxLength_ = qMax( maxLength_, length ); 58 linePosition_.append_list( linePosition ); 59 } 60 61 LogDataWorkerThread::LogDataWorkerThread() 62 : QThread(), mutex_(), operationRequestedCond_(), 63 nothingToDoCond_(), fileName_(), indexingData_() 64 { 65 terminate_ = false; 66 interruptRequested_ = false; 67 operationRequested_ = NULL; 68 } 69 70 LogDataWorkerThread::~LogDataWorkerThread() 71 { 72 { 73 QMutexLocker locker( &mutex_ ); 74 terminate_ = true; 75 operationRequestedCond_.wakeAll(); 76 } 77 wait(); 78 } 79 80 void LogDataWorkerThread::attachFile( const QString& fileName ) 81 { 82 QMutexLocker locker( &mutex_ ); // to protect fileName_ 83 84 fileName_ = fileName; 85 } 86 87 void LogDataWorkerThread::indexAll() 88 { 89 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 90 91 LOG(logDEBUG) << "FullIndex requested"; 92 93 // If an operation is ongoing, we will block 94 while ( (operationRequested_ != NULL) ) 95 nothingToDoCond_.wait( &mutex_ ); 96 97 interruptRequested_ = false; 98 operationRequested_ = new FullIndexOperation( fileName_, &interruptRequested_ ); 99 operationRequestedCond_.wakeAll(); 100 } 101 102 void LogDataWorkerThread::indexAdditionalLines( qint64 position ) 103 { 104 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 105 106 LOG(logDEBUG) << "AddLines requested"; 107 108 // If an operation is ongoing, we will block 109 while ( (operationRequested_ != NULL) ) 110 nothingToDoCond_.wait( &mutex_ ); 111 112 interruptRequested_ = false; 113 operationRequested_ = new PartialIndexOperation( fileName_, &interruptRequested_, position ); 114 operationRequestedCond_.wakeAll(); 115 } 116 117 void LogDataWorkerThread::interrupt() 118 { 119 LOG(logDEBUG) << "Load interrupt requested"; 120 121 // No mutex here, setting a bool is probably atomic! 122 interruptRequested_ = true; 123 } 124 125 // This will do an atomic copy of the object 126 // (hopefully fast as we use Qt containers) 127 void LogDataWorkerThread::getIndexingData( 128 qint64* indexedSize, int* maxLength, LinePositionArray* linePosition ) 129 { 130 indexingData_.getAll( indexedSize, maxLength, linePosition ); 131 } 132 133 // This is the thread's main loop 134 void LogDataWorkerThread::run() 135 { 136 QMutexLocker locker( &mutex_ ); 137 138 forever { 139 while ( (terminate_ == false) && (operationRequested_ == NULL) ) 140 operationRequestedCond_.wait( &mutex_ ); 141 LOG(logDEBUG) << "Worker thread signaled"; 142 143 // Look at what needs to be done 144 if ( terminate_ ) 145 return; // We must die 146 147 if ( operationRequested_ ) { 148 connect( operationRequested_, SIGNAL( indexingProgressed( int ) ), 149 this, SIGNAL( indexingProgressed( int ) ) ); 150 151 // Run the operation 152 try { 153 if ( operationRequested_->start( indexingData_ ) ) { 154 LOG(logDEBUG) << "... finished copy in workerThread."; 155 emit indexingFinished( LoadingStatus::Successful ); 156 } 157 else { 158 emit indexingFinished( LoadingStatus::Interrupted ); 159 } 160 } 161 catch ( std::bad_alloc& ba ) { 162 LOG(logERROR) << "Out of memory whilst indexing!"; 163 emit indexingFinished( LoadingStatus::NoMemory ); 164 } 165 166 delete operationRequested_; 167 operationRequested_ = NULL; 168 nothingToDoCond_.wakeAll(); 169 } 170 } 171 } 172 173 // 174 // Operations implementation 175 // 176 177 IndexOperation::IndexOperation( QString& fileName, bool* interruptRequest ) 178 : fileName_( fileName ) 179 { 180 interruptRequest_ = interruptRequest; 181 } 182 183 PartialIndexOperation::PartialIndexOperation( QString& fileName, 184 bool* interruptRequest, qint64 position ) 185 : IndexOperation( fileName, interruptRequest ) 186 { 187 initialPosition_ = position; 188 } 189 190 qint64 IndexOperation::doIndex( LinePositionArray& linePosition, int* maxLength, 191 qint64 initialPosition ) 192 { 193 int max_length = *maxLength; 194 qint64 pos = initialPosition; // Absolute position of the start of current line 195 qint64 end = 0; // Absolute position of the end of current line 196 int additional_spaces = 0; // Additional spaces due to tabs 197 198 QFile file( fileName_ ); 199 if ( file.open( QIODevice::ReadOnly ) ) { 200 // Count the number of lines and max length 201 // (read big chunks to speed up reading from disk) 202 file.seek( pos ); 203 while ( !file.atEnd() ) { 204 if ( *interruptRequest_ ) // a bool is always read/written atomically isn't it? 205 break; 206 207 // Read a chunk of 5MB 208 const qint64 block_beginning = file.pos(); 209 const QByteArray block = file.read( sizeChunk ); 210 211 // Count the number of lines in each chunk 212 qint64 pos_within_block = 0; 213 while ( pos_within_block != -1 ) { 214 pos_within_block = qMax( pos - block_beginning, 0LL); 215 // Looking for the next \n, expanding tabs in the process 216 do { 217 if ( pos_within_block < block.length() ) { 218 const char c = block.at(pos_within_block); 219 if ( c == '\n' ) 220 break; 221 else if ( c == '\t' ) 222 additional_spaces += AbstractLogData::tabStop - 223 ( ( ( block_beginning - pos ) + pos_within_block 224 + additional_spaces ) % AbstractLogData::tabStop ) - 1; 225 226 pos_within_block++; 227 } 228 else { 229 pos_within_block = -1; 230 } 231 } while ( pos_within_block != -1 ); 232 233 // When a end of line has been found... 234 if ( pos_within_block != -1 ) { 235 end = pos_within_block + block_beginning; 236 const int length = end-pos + additional_spaces; 237 if ( length > max_length ) 238 max_length = length; 239 pos = end + 1; 240 additional_spaces = 0; 241 linePosition.append( pos ); 242 } 243 } 244 245 // Update the caller for progress indication 246 int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100; 247 emit indexingProgressed( progress ); 248 } 249 250 // Check if there is a non LF terminated line at the end of the file 251 if ( file.size() > pos ) { 252 LOG( logWARNING ) << 253 "Non LF terminated file, adding a fake end of line"; 254 linePosition.append( file.size() + 1 ); 255 linePosition.setFakeFinalLF(); 256 } 257 } 258 else { 259 // TODO: Check that the file is seekable? 260 // If the file cannot be open, we do as if it was empty 261 LOG(logWARNING) << "Cannot open file " << fileName_.toStdString(); 262 263 emit indexingProgressed( 100 ); 264 } 265 266 *maxLength = max_length; 267 268 return file.size(); 269 } 270 271 // Called in the worker thread's context 272 // Should not use any shared variable 273 bool FullIndexOperation::start( IndexingData& sharedData ) 274 { 275 LOG(logDEBUG) << "FullIndexOperation::start(), file " 276 << fileName_.toStdString(); 277 278 LOG(logDEBUG) << "FullIndexOperation: Starting the count..."; 279 int maxLength = 0; 280 LinePositionArray linePosition = {}; 281 282 emit indexingProgressed( 0 ); 283 284 qint64 size = doIndex( linePosition, &maxLength, 0 ); 285 286 if ( *interruptRequest_ == false ) 287 { 288 // Commit the results to the shared data (atomically) 289 sharedData.setAll( size, maxLength, linePosition ); 290 } 291 292 LOG(logDEBUG) << "FullIndexOperation: ... finished counting." 293 "interrupt = " << *interruptRequest_; 294 295 return ( *interruptRequest_ ? false : true ); 296 } 297 298 bool PartialIndexOperation::start( IndexingData& sharedData ) 299 { 300 LOG(logDEBUG) << "PartialIndexOperation::start(), file " 301 << fileName_.toStdString(); 302 303 LOG(logDEBUG) << "PartialIndexOperation: Starting the count at " 304 << initialPosition_ << " ..."; 305 int maxLength = 0; 306 LinePositionArray linePosition = {}; 307 308 emit indexingProgressed( 0 ); 309 310 qint64 size = doIndex( linePosition, &maxLength, initialPosition_ ); 311 312 if ( *interruptRequest_ == false ) 313 { 314 // Commit the results to the shared data (atomically) 315 sharedData.addAll( size - initialPosition_, maxLength, linePosition ); 316 } 317 318 LOG(logDEBUG) << "PartialIndexOperation: ... finished counting."; 319 320 return ( *interruptRequest_ ? false : true ); 321 } 322