1 /* 2 * Copyright (C) 2009, 2010 Nicolas Bonnefon and other contributors 3 * 4 * This file is part of glogg. 5 * 6 * glogg is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * glogg is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with glogg. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include <QFile> 21 22 #include "log.h" 23 24 #include "logdata.h" 25 #include "logdataworkerthread.h" 26 27 // Size of the chunk to read (5 MiB) 28 const int IndexOperation::sizeChunk = 5*1024*1024; 29 30 void IndexingData::getAll( qint64* size, int* length, 31 LinePositionArray* linePosition ) 32 { 33 QMutexLocker locker( &dataMutex_ ); 34 35 *size = indexedSize_; 36 *length = maxLength_; 37 *linePosition = linePosition_; 38 } 39 40 void IndexingData::setAll( qint64 size, int length, 41 const LinePositionArray& linePosition ) 42 { 43 QMutexLocker locker( &dataMutex_ ); 44 45 indexedSize_ = size; 46 maxLength_ = length; 47 linePosition_ = linePosition; 48 } 49 50 void IndexingData::addAll( qint64 size, int length, 51 const LinePositionArray& linePosition ) 52 { 53 QMutexLocker locker( &dataMutex_ ); 54 55 indexedSize_ += size; 56 maxLength_ = qMax( maxLength_, length ); 57 linePosition_ += linePosition; 58 } 59 60 LogDataWorkerThread::LogDataWorkerThread() 61 : QThread(), mutex_(), operationRequestedCond_(), 62 nothingToDoCond_(), fileName_(), indexingData_() 63 { 64 terminate_ = false; 65 interruptRequested_ = false; 66 operationRequested_ = NULL; 67 } 68 69 LogDataWorkerThread::~LogDataWorkerThread() 70 { 71 { 72 QMutexLocker locker( &mutex_ ); 73 terminate_ = true; 74 operationRequestedCond_.wakeAll(); 75 } 76 wait(); 77 } 78 79 void LogDataWorkerThread::attachFile( const QString& fileName ) 80 { 81 QMutexLocker locker( &mutex_ ); // to protect fileName_ 82 83 fileName_ = fileName; 84 } 85 86 void LogDataWorkerThread::indexAll() 87 { 88 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 89 90 LOG(logDEBUG) << "FullIndex requested"; 91 92 // If an operation is ongoing, we will block 93 while ( (operationRequested_ != NULL) ) 94 nothingToDoCond_.wait( &mutex_ ); 95 96 interruptRequested_ = false; 97 operationRequested_ = new FullIndexOperation( fileName_, &interruptRequested_ ); 98 operationRequestedCond_.wakeAll(); 99 } 100 101 void LogDataWorkerThread::indexAdditionalLines( qint64 position ) 102 { 103 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 104 105 LOG(logDEBUG) << "AddLines requested"; 106 107 // If an operation is ongoing, we will block 108 while ( (operationRequested_ != NULL) ) 109 nothingToDoCond_.wait( &mutex_ ); 110 111 interruptRequested_ = false; 112 operationRequested_ = new PartialIndexOperation( fileName_, &interruptRequested_, position ); 113 operationRequestedCond_.wakeAll(); 114 } 115 116 void LogDataWorkerThread::interrupt() 117 { 118 LOG(logDEBUG) << "Load interrupt requested"; 119 120 // No mutex here, setting a bool is probably atomic! 121 interruptRequested_ = true; 122 } 123 124 // This will do an atomic copy of the object 125 // (hopefully fast as we use Qt containers) 126 void LogDataWorkerThread::getIndexingData( 127 qint64* indexedSize, int* maxLength, LinePositionArray* linePosition ) 128 { 129 indexingData_.getAll( indexedSize, maxLength, linePosition ); 130 } 131 132 // This is the thread's main loop 133 void LogDataWorkerThread::run() 134 { 135 QMutexLocker locker( &mutex_ ); 136 137 forever { 138 while ( (terminate_ == false) && (operationRequested_ == NULL) ) 139 operationRequestedCond_.wait( &mutex_ ); 140 LOG(logDEBUG) << "Worker thread signaled"; 141 142 // Look at what needs to be done 143 if ( terminate_ ) 144 return; // We must die 145 146 if ( operationRequested_ ) { 147 connect( operationRequested_, SIGNAL( indexingProgressed( int ) ), 148 this, SIGNAL( indexingProgressed( int ) ) ); 149 150 // Run the operation 151 if ( operationRequested_->start( indexingData_ ) ) { 152 LOG(logDEBUG) << "... finished copy in workerThread."; 153 emit indexingFinished( true ); 154 } 155 else { 156 emit indexingFinished( false ); 157 } 158 159 delete operationRequested_; 160 operationRequested_ = NULL; 161 nothingToDoCond_.wakeAll(); 162 } 163 } 164 } 165 166 // 167 // Operations implementation 168 // 169 170 IndexOperation::IndexOperation( QString& fileName, bool* interruptRequest ) 171 : fileName_( fileName ) 172 { 173 interruptRequest_ = interruptRequest; 174 } 175 176 PartialIndexOperation::PartialIndexOperation( QString& fileName, 177 bool* interruptRequest, qint64 position ) 178 : IndexOperation( fileName, interruptRequest ) 179 { 180 initialPosition_ = position; 181 } 182 183 qint64 IndexOperation::doIndex( LinePositionArray& linePosition, int* maxLength, 184 qint64 initialPosition ) 185 { 186 int max_length = *maxLength; 187 qint64 pos = initialPosition; // Absolute position of the start of current line 188 qint64 end = 0; // Absolute position of the end of current line 189 int additional_spaces = 0; // Additional spaces due to tabs 190 191 QFile file( fileName_ ); 192 if ( file.open( QIODevice::ReadOnly ) ) { 193 // Count the number of lines and max length 194 // (read big chunks to speed up reading from disk) 195 file.seek( pos ); 196 while ( !file.atEnd() ) { 197 if ( *interruptRequest_ ) // a bool is always read/written atomically isn't it? 198 break; 199 200 // Read a chunk of 5MB 201 const qint64 block_beginning = file.pos(); 202 const QByteArray block = file.read( sizeChunk ); 203 204 // Count the number of lines in each chunk 205 qint64 pos_within_block = 0; 206 while ( pos_within_block != -1 ) { 207 pos_within_block = qMax( pos - block_beginning, 0LL); 208 // Looking for the next \n, expanding tabs in the process 209 do { 210 if ( pos_within_block < block.length() ) { 211 const char c = block.at(pos_within_block); 212 if ( c == '\n' ) 213 break; 214 else if ( c == '\t' ) 215 additional_spaces += AbstractLogData::tabStop - 216 ( ( ( block_beginning - pos ) + pos_within_block 217 + additional_spaces ) % AbstractLogData::tabStop ) - 1; 218 219 pos_within_block++; 220 } 221 else { 222 pos_within_block = -1; 223 } 224 } while ( pos_within_block != -1 ); 225 226 // When a end of line has been found... 227 if ( pos_within_block != -1 ) { 228 end = pos_within_block + block_beginning; 229 const int length = end-pos + additional_spaces; 230 if ( length > max_length ) 231 max_length = length; 232 pos = end + 1; 233 additional_spaces = 0; 234 linePosition.append( pos ); 235 } 236 } 237 238 // Update the caller for progress indication 239 int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100; 240 emit indexingProgressed( progress ); 241 } 242 243 // Check if there is a non LF terminated line at the end of the file 244 if ( file.size() > pos ) { 245 LOG( logWARNING ) << 246 "Non LF terminated file, adding a fake end of line"; 247 linePosition.append( file.size() + 1 ); 248 linePosition.setFakeFinalLF(); 249 } 250 } 251 else { 252 // TODO: Check that the file is seekable? 253 // If the file cannot be open, we do as if it was empty 254 LOG(logWARNING) << "Cannot open file " << fileName_.toStdString(); 255 256 emit indexingProgressed( 100 ); 257 } 258 259 *maxLength = max_length; 260 261 return file.size(); 262 } 263 264 // Called in the worker thread's context 265 // Should not use any shared variable 266 bool FullIndexOperation::start( IndexingData& sharedData ) 267 { 268 LOG(logDEBUG) << "FullIndexOperation::start(), file " 269 << fileName_.toStdString(); 270 271 LOG(logDEBUG) << "FullIndexOperation: Starting the count..."; 272 int maxLength = 0; 273 LinePositionArray linePosition = LinePositionArray(); 274 275 emit indexingProgressed( 0 ); 276 277 qint64 size = doIndex( linePosition, &maxLength, 0 ); 278 279 if ( *interruptRequest_ == false ) 280 { 281 // Commit the results to the shared data (atomically) 282 sharedData.setAll( size, maxLength, linePosition ); 283 } 284 285 LOG(logDEBUG) << "FullIndexOperation: ... finished counting." 286 "interrupt = " << *interruptRequest_; 287 288 return ( *interruptRequest_ ? false : true ); 289 } 290 291 bool PartialIndexOperation::start( IndexingData& sharedData ) 292 { 293 LOG(logDEBUG) << "PartialIndexOperation::start(), file " 294 << fileName_.toStdString(); 295 296 LOG(logDEBUG) << "PartialIndexOperation: Starting the count at " 297 << initialPosition_ << " ..."; 298 int maxLength = 0; 299 LinePositionArray linePosition = LinePositionArray(); 300 301 emit indexingProgressed( 0 ); 302 303 qint64 size = doIndex( linePosition, &maxLength, initialPosition_ ); 304 305 if ( *interruptRequest_ == false ) 306 { 307 // Commit the results to the shared data (atomically) 308 sharedData.addAll( size - initialPosition_, maxLength, linePosition ); 309 } 310 311 LOG(logDEBUG) << "PartialIndexOperation: ... finished counting."; 312 313 return ( *interruptRequest_ ? false : true ); 314 } 315