1 /* 2 * Copyright (C) 2009, 2010, 2014, 2015 Nicolas Bonnefon and other contributors 3 * 4 * This file is part of glogg. 5 * 6 * glogg is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * glogg is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with glogg. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include <QFile> 21 22 #include "log.h" 23 24 #include "logdata.h" 25 #include "logdataworkerthread.h" 26 27 // Size of the chunk to read (5 MiB) 28 const int IndexOperation::sizeChunk = 5*1024*1024; 29 30 qint64 IndexingData::getSize() const 31 { 32 QMutexLocker locker( &dataMutex_ ); 33 34 return indexedSize_; 35 } 36 37 int IndexingData::getMaxLength() const 38 { 39 QMutexLocker locker( &dataMutex_ ); 40 41 return maxLength_; 42 } 43 44 LineNumber IndexingData::getNbLines() const 45 { 46 QMutexLocker locker( &dataMutex_ ); 47 48 return linePosition_.size(); 49 } 50 51 qint64 IndexingData::getPosForLine( LineNumber line ) const 52 { 53 QMutexLocker locker( &dataMutex_ ); 54 55 return linePosition_.at( line ); 56 } 57 58 void IndexingData::setAll( qint64 size, int length, 59 const FastLinePositionArray& linePosition ) 60 { 61 QMutexLocker locker( &dataMutex_ ); 62 63 indexedSize_ = size; 64 maxLength_ = length; 65 linePosition_ = std::move( linePosition ); 66 } 67 68 void IndexingData::addAll( qint64 size, int length, 69 const FastLinePositionArray& linePosition ) 70 { 71 QMutexLocker locker( &dataMutex_ ); 72 73 indexedSize_ += size; 74 maxLength_ = qMax( maxLength_, length ); 75 linePosition_.append_list( linePosition ); 76 } 77 78 void IndexingData::clear() 79 { 80 maxLength_ = 0; 81 indexedSize_ = 0; 82 linePosition_ = LinePositionArray(); 83 } 84 85 LogDataWorkerThread::LogDataWorkerThread( IndexingData* indexing_data ) 86 : QThread(), mutex_(), operationRequestedCond_(), 87 nothingToDoCond_(), fileName_(), indexing_data_( indexing_data ) 88 { 89 terminate_ = false; 90 interruptRequested_ = false; 91 operationRequested_ = NULL; 92 } 93 94 LogDataWorkerThread::~LogDataWorkerThread() 95 { 96 { 97 QMutexLocker locker( &mutex_ ); 98 terminate_ = true; 99 operationRequestedCond_.wakeAll(); 100 } 101 wait(); 102 } 103 104 void LogDataWorkerThread::attachFile( const QString& fileName ) 105 { 106 QMutexLocker locker( &mutex_ ); // to protect fileName_ 107 108 fileName_ = fileName; 109 } 110 111 void LogDataWorkerThread::indexAll() 112 { 113 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 114 115 LOG(logDEBUG) << "FullIndex requested"; 116 117 // If an operation is ongoing, we will block 118 while ( (operationRequested_ != NULL) ) 119 nothingToDoCond_.wait( &mutex_ ); 120 121 interruptRequested_ = false; 122 operationRequested_ = new FullIndexOperation( fileName_, 123 indexing_data_, &interruptRequested_ ); 124 operationRequestedCond_.wakeAll(); 125 } 126 127 void LogDataWorkerThread::indexAdditionalLines( qint64 position ) 128 { 129 QMutexLocker locker( &mutex_ ); // to protect operationRequested_ 130 131 LOG(logDEBUG) << "AddLines requested"; 132 133 // If an operation is ongoing, we will block 134 while ( (operationRequested_ != NULL) ) 135 nothingToDoCond_.wait( &mutex_ ); 136 137 interruptRequested_ = false; 138 operationRequested_ = new PartialIndexOperation( fileName_, 139 indexing_data_, &interruptRequested_, position ); 140 operationRequestedCond_.wakeAll(); 141 } 142 143 void LogDataWorkerThread::interrupt() 144 { 145 LOG(logDEBUG) << "Load interrupt requested"; 146 147 // No mutex here, setting a bool is probably atomic! 148 interruptRequested_ = true; 149 } 150 151 // This is the thread's main loop 152 void LogDataWorkerThread::run() 153 { 154 QMutexLocker locker( &mutex_ ); 155 156 forever { 157 while ( (terminate_ == false) && (operationRequested_ == NULL) ) 158 operationRequestedCond_.wait( &mutex_ ); 159 LOG(logDEBUG) << "Worker thread signaled"; 160 161 // Look at what needs to be done 162 if ( terminate_ ) 163 return; // We must die 164 165 if ( operationRequested_ ) { 166 connect( operationRequested_, SIGNAL( indexingProgressed( int ) ), 167 this, SIGNAL( indexingProgressed( int ) ) ); 168 169 // Run the operation 170 try { 171 if ( operationRequested_->start() ) { 172 LOG(logDEBUG) << "... finished copy in workerThread."; 173 emit indexingFinished( LoadingStatus::Successful ); 174 } 175 else { 176 emit indexingFinished( LoadingStatus::Interrupted ); 177 } 178 } 179 catch ( std::bad_alloc& ba ) { 180 LOG(logERROR) << "Out of memory whilst indexing!"; 181 emit indexingFinished( LoadingStatus::NoMemory ); 182 } 183 184 delete operationRequested_; 185 operationRequested_ = NULL; 186 nothingToDoCond_.wakeAll(); 187 } 188 } 189 } 190 191 // 192 // Operations implementation 193 // 194 195 IndexOperation::IndexOperation( const QString& fileName, 196 IndexingData* indexingData, bool* interruptRequest ) 197 : fileName_( fileName ) 198 { 199 interruptRequest_ = interruptRequest; 200 indexing_data_ = indexingData; 201 } 202 203 PartialIndexOperation::PartialIndexOperation( const QString& fileName, 204 IndexingData* indexingData, bool* interruptRequest, qint64 position ) 205 : IndexOperation( fileName, indexingData, interruptRequest ) 206 { 207 initialPosition_ = position; 208 } 209 210 void IndexOperation::doIndex( IndexingData* indexing_data, qint64 initialPosition ) 211 { 212 qint64 pos = initialPosition; // Absolute position of the start of current line 213 qint64 end = 0; // Absolute position of the end of current line 214 int additional_spaces = 0; // Additional spaces due to tabs 215 216 QFile file( fileName_ ); 217 if ( file.open( QIODevice::ReadOnly ) ) { 218 // Count the number of lines and max length 219 // (read big chunks to speed up reading from disk) 220 file.seek( pos ); 221 while ( !file.atEnd() ) { 222 FastLinePositionArray line_positions; 223 int max_length = 0; 224 225 if ( *interruptRequest_ ) // a bool is always read/written atomically isn't it? 226 break; 227 228 // Read a chunk of 5MB 229 const qint64 block_beginning = file.pos(); 230 const QByteArray block = file.read( sizeChunk ); 231 232 // Count the number of lines in each chunk 233 qint64 pos_within_block = 0; 234 while ( pos_within_block != -1 ) { 235 pos_within_block = qMax( pos - block_beginning, 0LL); 236 // Looking for the next \n, expanding tabs in the process 237 do { 238 if ( pos_within_block < block.length() ) { 239 const char c = block.at(pos_within_block); 240 if ( c == '\n' ) 241 break; 242 else if ( c == '\t' ) 243 additional_spaces += AbstractLogData::tabStop - 244 ( ( ( block_beginning - pos ) + pos_within_block 245 + additional_spaces ) % AbstractLogData::tabStop ) - 1; 246 247 pos_within_block++; 248 } 249 else { 250 pos_within_block = -1; 251 } 252 } while ( pos_within_block != -1 ); 253 254 // When a end of line has been found... 255 if ( pos_within_block != -1 ) { 256 end = pos_within_block + block_beginning; 257 const int length = end-pos + additional_spaces; 258 if ( length > max_length ) 259 max_length = length; 260 pos = end + 1; 261 additional_spaces = 0; 262 line_positions.append( pos ); 263 } 264 } 265 266 // Update the shared data 267 indexing_data->addAll( block.length(), max_length, line_positions ); 268 269 // Update the caller for progress indication 270 int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100; 271 emit indexingProgressed( progress ); 272 } 273 274 // Check if there is a non LF terminated line at the end of the file 275 qint64 file_size = file.size(); 276 if ( !*interruptRequest_ && file_size > pos ) { 277 LOG( logWARNING ) << 278 "Non LF terminated file, adding a fake end of line"; 279 280 FastLinePositionArray line_position; 281 line_position.append( file_size + 1 ); 282 line_position.setFakeFinalLF(); 283 284 indexing_data->addAll( 0, 0, line_position ); 285 } 286 } 287 else { 288 // TODO: Check that the file is seekable? 289 // If the file cannot be open, we do as if it was empty 290 LOG(logWARNING) << "Cannot open file " << fileName_.toStdString(); 291 292 emit indexingProgressed( 100 ); 293 } 294 } 295 296 // Called in the worker thread's context 297 bool FullIndexOperation::start() 298 { 299 LOG(logDEBUG) << "FullIndexOperation::start(), file " 300 << fileName_.toStdString(); 301 302 LOG(logDEBUG) << "FullIndexOperation: Starting the count..."; 303 304 emit indexingProgressed( 0 ); 305 306 // First empty the index 307 indexing_data_->clear(); 308 309 doIndex( indexing_data_, 0 ); 310 311 LOG(logDEBUG) << "FullIndexOperation: ... finished counting." 312 "interrupt = " << *interruptRequest_; 313 314 return ( *interruptRequest_ ? false : true ); 315 } 316 317 bool PartialIndexOperation::start() 318 { 319 LOG(logDEBUG) << "PartialIndexOperation::start(), file " 320 << fileName_.toStdString(); 321 322 LOG(logDEBUG) << "PartialIndexOperation: Starting the count at " 323 << initialPosition_ << " ..."; 324 325 emit indexingProgressed( 0 ); 326 327 doIndex( indexing_data_, initialPosition_ ); 328 329 LOG(logDEBUG) << "PartialIndexOperation: ... finished counting."; 330 331 return ( *interruptRequest_ ? false : true ); 332 } 333