1 /* 2 * Copyright (C) 2009, 2010, 2014 Nicolas Bonnefon and other contributors 3 * 4 * This file is part of glogg. 5 * 6 * glogg is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * glogg is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with glogg. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include <QFile> 21 22 #include "log.h" 23 24 #include "logdata.h" 25 #include "logdataworkerthread.h" 26 27 // Size of the chunk to read (5 MiB) 28 const int IndexOperation::sizeChunk = 5*1024*1024; 29 30 void IndexingData::getAll( qint64* size, int* length, 31 LinePositionArray* linePosition ) 32 { 33 QMutexLocker locker( &dataMutex_ ); 34 35 *size = indexedSize_; 36 *length = maxLength_; 37 *linePosition = std::move( linePosition_ ); 38 } 39 40 void IndexingData::setAll( qint64 size, int length, 41 LinePositionArray& linePosition ) 42 { 43 QMutexLocker locker( &dataMutex_ ); 44 45 indexedSize_ = size; 46 maxLength_ = length; 47 linePosition_ = std::move( linePosition ); 48 } 49 50 void IndexingData::addAll( qint64 size, int length, 51 const LinePositionArray& linePosition ) 52 { 53 QMutexLocker locker( &dataMutex_ ); 54 55 indexedSize_ += size; 56 maxLength_ = qMax( maxLength_, length ); 57 linePosition_.append_list( linePosition ); 58 } 59 60 LogDataWorkerThread::LogDataWorkerThread() 61 : QThread(), mutex_(), operationRequestedCond_(), 62 nothingToDoCond_(), fileName_(), indexingData_() 63 { 64 terminate_ = false; 65 interruptRequested_ = false; 66 operationRequested_ = NULL; 67 } 68 69 LogDataWorkerThread::~LogDataWorkerThread() 70 { 71 { 72 QMutexLocker locker( &mutex_ ); 73 terminate_ = true; 74 operationRequestedCond_.wakeAll(); 75 } 76 wait(); 77 } 78 79 void LogDataWorkerThread::attachFile( const QString& fileName ) 80 { 81 QMutexLocker locker( &mutex_ ); // to protect fileName_ 82 83 fileName_ = fileName; 84 } 85 86 void LogDataWorkerThread::indexAll() 87 { 88 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 89 90 LOG(logDEBUG) << "FullIndex requested"; 91 92 // If an operation is ongoing, we will block 93 while ( (operationRequested_ != NULL) ) 94 nothingToDoCond_.wait( &mutex_ ); 95 96 interruptRequested_ = false; 97 operationRequested_ = new FullIndexOperation( fileName_, &interruptRequested_ ); 98 operationRequestedCond_.wakeAll(); 99 } 100 101 void LogDataWorkerThread::indexAdditionalLines( qint64 position ) 102 { 103 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 104 105 LOG(logDEBUG) << "AddLines requested"; 106 107 // If an operation is ongoing, we will block 108 while ( (operationRequested_ != NULL) ) 109 nothingToDoCond_.wait( &mutex_ ); 110 111 interruptRequested_ = false; 112 operationRequested_ = new PartialIndexOperation( fileName_, &interruptRequested_, position ); 113 operationRequestedCond_.wakeAll(); 114 } 115 116 void LogDataWorkerThread::interrupt() 117 { 118 LOG(logDEBUG) << "Load interrupt requested"; 119 120 // No mutex here, setting a bool is probably atomic! 121 interruptRequested_ = true; 122 } 123 124 // This will do an atomic copy of the object 125 // (hopefully fast as we use Qt containers) 126 void LogDataWorkerThread::getIndexingData( 127 qint64* indexedSize, int* maxLength, LinePositionArray* linePosition ) 128 { 129 indexingData_.getAll( indexedSize, maxLength, linePosition ); 130 } 131 132 // This is the thread's main loop 133 void LogDataWorkerThread::run() 134 { 135 QMutexLocker locker( &mutex_ ); 136 137 forever { 138 while ( (terminate_ == false) && (operationRequested_ == NULL) ) 139 operationRequestedCond_.wait( &mutex_ ); 140 LOG(logDEBUG) << "Worker thread signaled"; 141 142 // Look at what needs to be done 143 if ( terminate_ ) 144 return; // We must die 145 146 if ( operationRequested_ ) { 147 connect( operationRequested_, SIGNAL( indexingProgressed( int ) ), 148 this, SIGNAL( indexingProgressed( int ) ) ); 149 150 // Run the operation 151 try { 152 if ( operationRequested_->start( indexingData_ ) ) { 153 LOG(logDEBUG) << "... finished copy in workerThread."; 154 emit indexingFinished( LoadingStatus::Successful ); 155 } 156 else { 157 emit indexingFinished( LoadingStatus::Interrupted ); 158 } 159 } 160 catch ( std::bad_alloc& ba ) { 161 LOG(logERROR) << "Out of memory whilst indexing!"; 162 emit indexingFinished( LoadingStatus::NoMemory ); 163 } 164 165 delete operationRequested_; 166 operationRequested_ = NULL; 167 nothingToDoCond_.wakeAll(); 168 } 169 } 170 } 171 172 // 173 // Operations implementation 174 // 175 176 IndexOperation::IndexOperation( QString& fileName, bool* interruptRequest ) 177 : fileName_( fileName ) 178 { 179 interruptRequest_ = interruptRequest; 180 } 181 182 PartialIndexOperation::PartialIndexOperation( QString& fileName, 183 bool* interruptRequest, qint64 position ) 184 : IndexOperation( fileName, interruptRequest ) 185 { 186 initialPosition_ = position; 187 } 188 189 qint64 IndexOperation::doIndex( LinePositionArray& linePosition, int* maxLength, 190 qint64 initialPosition ) 191 { 192 int max_length = *maxLength; 193 qint64 pos = initialPosition; // Absolute position of the start of current line 194 qint64 end = 0; // Absolute position of the end of current line 195 int additional_spaces = 0; // Additional spaces due to tabs 196 197 QFile file( fileName_ ); 198 if ( file.open( QIODevice::ReadOnly ) ) { 199 // Count the number of lines and max length 200 // (read big chunks to speed up reading from disk) 201 file.seek( pos ); 202 while ( !file.atEnd() ) { 203 if ( *interruptRequest_ ) // a bool is always read/written atomically isn't it? 204 break; 205 206 // Read a chunk of 5MB 207 const qint64 block_beginning = file.pos(); 208 const QByteArray block = file.read( sizeChunk ); 209 210 // Count the number of lines in each chunk 211 qint64 pos_within_block = 0; 212 while ( pos_within_block != -1 ) { 213 pos_within_block = qMax( pos - block_beginning, 0LL); 214 // Looking for the next \n, expanding tabs in the process 215 do { 216 if ( pos_within_block < block.length() ) { 217 const char c = block.at(pos_within_block); 218 if ( c == '\n' ) 219 break; 220 else if ( c == '\t' ) 221 additional_spaces += AbstractLogData::tabStop - 222 ( ( ( block_beginning - pos ) + pos_within_block 223 + additional_spaces ) % AbstractLogData::tabStop ) - 1; 224 225 pos_within_block++; 226 } 227 else { 228 pos_within_block = -1; 229 } 230 } while ( pos_within_block != -1 ); 231 232 // When a end of line has been found... 233 if ( pos_within_block != -1 ) { 234 end = pos_within_block + block_beginning; 235 const int length = end-pos + additional_spaces; 236 if ( length > max_length ) 237 max_length = length; 238 pos = end + 1; 239 additional_spaces = 0; 240 linePosition.append( pos ); 241 } 242 } 243 244 // Update the caller for progress indication 245 int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100; 246 emit indexingProgressed( progress ); 247 } 248 249 // Check if there is a non LF terminated line at the end of the file 250 if ( file.size() > pos ) { 251 LOG( logWARNING ) << 252 "Non LF terminated file, adding a fake end of line"; 253 linePosition.append( file.size() + 1 ); 254 linePosition.setFakeFinalLF(); 255 } 256 } 257 else { 258 // TODO: Check that the file is seekable? 259 // If the file cannot be open, we do as if it was empty 260 LOG(logWARNING) << "Cannot open file " << fileName_.toStdString(); 261 262 emit indexingProgressed( 100 ); 263 } 264 265 *maxLength = max_length; 266 267 return file.size(); 268 } 269 270 // Called in the worker thread's context 271 // Should not use any shared variable 272 bool FullIndexOperation::start( IndexingData& sharedData ) 273 { 274 LOG(logDEBUG) << "FullIndexOperation::start(), file " 275 << fileName_.toStdString(); 276 277 LOG(logDEBUG) << "FullIndexOperation: Starting the count..."; 278 int maxLength = 0; 279 LinePositionArray linePosition = {}; 280 281 emit indexingProgressed( 0 ); 282 283 qint64 size = doIndex( linePosition, &maxLength, 0 ); 284 285 if ( *interruptRequest_ == false ) 286 { 287 // Commit the results to the shared data (atomically) 288 sharedData.setAll( size, maxLength, linePosition ); 289 } 290 291 LOG(logDEBUG) << "FullIndexOperation: ... finished counting." 292 "interrupt = " << *interruptRequest_; 293 294 return ( *interruptRequest_ ? false : true ); 295 } 296 297 bool PartialIndexOperation::start( IndexingData& sharedData ) 298 { 299 LOG(logDEBUG) << "PartialIndexOperation::start(), file " 300 << fileName_.toStdString(); 301 302 LOG(logDEBUG) << "PartialIndexOperation: Starting the count at " 303 << initialPosition_ << " ..."; 304 int maxLength = 0; 305 LinePositionArray linePosition = {}; 306 307 emit indexingProgressed( 0 ); 308 309 qint64 size = doIndex( linePosition, &maxLength, initialPosition_ ); 310 311 if ( *interruptRequest_ == false ) 312 { 313 // Commit the results to the shared data (atomically) 314 sharedData.addAll( size - initialPosition_, maxLength, linePosition ); 315 } 316 317 LOG(logDEBUG) << "PartialIndexOperation: ... finished counting."; 318 319 return ( *interruptRequest_ ? false : true ); 320 } 321