xref: /glogg/src/data/logdataworkerthread.cpp (revision 653377b63177ff63943e126bf71462b35ceb8b9d)
1 /*
2  * Copyright (C) 2009, 2010, 2014, 2015 Nicolas Bonnefon and other contributors
3  *
4  * This file is part of glogg.
5  *
6  * glogg is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * glogg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with glogg.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include <QFile>
21 
22 #include "log.h"
23 
24 #include "logdata.h"
25 #include "logdataworkerthread.h"
26 
27 // Size of the chunk to read (5 MiB)
28 const int IndexOperation::sizeChunk = 5*1024*1024;
29 
30 qint64 IndexingData::getSize() const
31 {
32     QMutexLocker locker( &dataMutex_ );
33 
34     return indexedSize_;
35 }
36 
37 int IndexingData::getMaxLength() const
38 {
39     QMutexLocker locker( &dataMutex_ );
40 
41     return maxLength_;
42 }
43 
44 LineNumber IndexingData::getNbLines() const
45 {
46     QMutexLocker locker( &dataMutex_ );
47 
48     return linePosition_.size();
49 }
50 
51 qint64 IndexingData::getPosForLine( LineNumber line ) const
52 {
53     QMutexLocker locker( &dataMutex_ );
54 
55     return linePosition_.at( line );
56 }
57 
58 void IndexingData::setAll( qint64 size, int length,
59         const FastLinePositionArray& linePosition )
60 {
61     QMutexLocker locker( &dataMutex_ );
62 
63     indexedSize_  = size;
64     maxLength_    = length;
65     linePosition_ = std::move( linePosition );
66 }
67 
68 void IndexingData::addAll( qint64 size, int length,
69         const FastLinePositionArray& linePosition )
70 {
71     QMutexLocker locker( &dataMutex_ );
72 
73     indexedSize_  += size;
74     maxLength_     = qMax( maxLength_, length );
75     linePosition_.append_list( linePosition );
76 }
77 
78 void IndexingData::clear()
79 {
80     maxLength_   = 0;
81     indexedSize_ = 0;
82     linePosition_ = LinePositionArray();
83 }
84 
85 LogDataWorkerThread::LogDataWorkerThread( IndexingData* indexing_data )
86     : QThread(), mutex_(), operationRequestedCond_(),
87     nothingToDoCond_(), fileName_(), indexing_data_( indexing_data )
88 {
89     terminate_          = false;
90     interruptRequested_ = false;
91     operationRequested_ = NULL;
92 }
93 
94 LogDataWorkerThread::~LogDataWorkerThread()
95 {
96     {
97         QMutexLocker locker( &mutex_ );
98         terminate_ = true;
99         operationRequestedCond_.wakeAll();
100     }
101     wait();
102 }
103 
104 void LogDataWorkerThread::attachFile( const QString& fileName )
105 {
106     QMutexLocker locker( &mutex_ );  // to protect fileName_
107 
108     fileName_ = fileName;
109 }
110 
111 void LogDataWorkerThread::indexAll()
112 {
113     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
114 
115     LOG(logDEBUG) << "FullIndex requested";
116 
117     // If an operation is ongoing, we will block
118     while ( (operationRequested_ != NULL) )
119         nothingToDoCond_.wait( &mutex_ );
120 
121     interruptRequested_ = false;
122     operationRequested_ = new FullIndexOperation( fileName_,
123             indexing_data_, &interruptRequested_ );
124     operationRequestedCond_.wakeAll();
125 }
126 
127 void LogDataWorkerThread::indexAdditionalLines( qint64 position )
128 {
129     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
130 
131     LOG(logDEBUG) << "AddLines requested";
132 
133     // If an operation is ongoing, we will block
134     while ( (operationRequested_ != NULL) )
135         nothingToDoCond_.wait( &mutex_ );
136 
137     interruptRequested_ = false;
138     operationRequested_ = new PartialIndexOperation( fileName_,
139             indexing_data_, &interruptRequested_, position );
140     operationRequestedCond_.wakeAll();
141 }
142 
143 void LogDataWorkerThread::interrupt()
144 {
145     LOG(logDEBUG) << "Load interrupt requested";
146 
147     // No mutex here, setting a bool is probably atomic!
148     interruptRequested_ = true;
149 }
150 
151 // This is the thread's main loop
152 void LogDataWorkerThread::run()
153 {
154     QMutexLocker locker( &mutex_ );
155 
156     forever {
157         while ( (terminate_ == false) && (operationRequested_ == NULL) )
158             operationRequestedCond_.wait( &mutex_ );
159         LOG(logDEBUG) << "Worker thread signaled";
160 
161         // Look at what needs to be done
162         if ( terminate_ )
163             return;      // We must die
164 
165         if ( operationRequested_ ) {
166             connect( operationRequested_, SIGNAL( indexingProgressed( int ) ),
167                     this, SIGNAL( indexingProgressed( int ) ) );
168 
169             // Run the operation
170             try {
171                 if ( operationRequested_->start() ) {
172                     LOG(logDEBUG) << "... finished copy in workerThread.";
173                     emit indexingFinished( LoadingStatus::Successful );
174                 }
175                 else {
176                     emit indexingFinished( LoadingStatus::Interrupted );
177                 }
178             }
179             catch ( std::bad_alloc& ba ) {
180                 LOG(logERROR) << "Out of memory whilst indexing!";
181                 emit indexingFinished( LoadingStatus::NoMemory );
182             }
183 
184             delete operationRequested_;
185             operationRequested_ = NULL;
186             nothingToDoCond_.wakeAll();
187         }
188     }
189 }
190 
191 //
192 // Operations implementation
193 //
194 
195 IndexOperation::IndexOperation( const QString& fileName,
196        IndexingData* indexingData, bool* interruptRequest )
197     : fileName_( fileName )
198 {
199     interruptRequest_ = interruptRequest;
200     indexing_data_ = indexingData;
201 }
202 
203 PartialIndexOperation::PartialIndexOperation( const QString& fileName,
204         IndexingData* indexingData, bool* interruptRequest, qint64 position )
205     : IndexOperation( fileName, indexingData, interruptRequest )
206 {
207     initialPosition_ = position;
208 }
209 
210 void IndexOperation::doIndex( IndexingData* indexing_data, qint64 initialPosition )
211 {
212     qint64 pos = initialPosition; // Absolute position of the start of current line
213     qint64 end = 0;               // Absolute position of the end of current line
214     int additional_spaces = 0;    // Additional spaces due to tabs
215 
216     QFile file( fileName_ );
217     if ( file.open( QIODevice::ReadOnly ) ) {
218         // Count the number of lines and max length
219         // (read big chunks to speed up reading from disk)
220         file.seek( pos );
221         while ( !file.atEnd() ) {
222             FastLinePositionArray line_positions;
223             int max_length = 0;
224 
225             if ( *interruptRequest_ )   // a bool is always read/written atomically isn't it?
226                 break;
227 
228             // Read a chunk of 5MB
229             const qint64 block_beginning = file.pos();
230             const QByteArray block = file.read( sizeChunk );
231 
232             // Count the number of lines in each chunk
233             qint64 pos_within_block = 0;
234             while ( pos_within_block != -1 ) {
235                 pos_within_block = qMax( pos - block_beginning, 0LL);
236                 // Looking for the next \n, expanding tabs in the process
237                 do {
238                     if ( pos_within_block < block.length() ) {
239                         const char c = block.at(pos_within_block);
240                         if ( c == '\n' )
241                             break;
242                         else if ( c == '\t' )
243                             additional_spaces += AbstractLogData::tabStop -
244                                 ( ( ( block_beginning - pos ) + pos_within_block
245                                     + additional_spaces ) % AbstractLogData::tabStop ) - 1;
246 
247                         pos_within_block++;
248                     }
249                     else {
250                         pos_within_block = -1;
251                     }
252                 } while ( pos_within_block != -1 );
253 
254                 // When a end of line has been found...
255                 if ( pos_within_block != -1 ) {
256                     end = pos_within_block + block_beginning;
257                     const int length = end-pos + additional_spaces;
258                     if ( length > max_length )
259                         max_length = length;
260                     pos = end + 1;
261                     additional_spaces = 0;
262                     line_positions.append( pos );
263                 }
264             }
265 
266             // Update the shared data
267             indexing_data->addAll( block.length(), max_length, line_positions );
268 
269             // Update the caller for progress indication
270             int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100;
271             emit indexingProgressed( progress );
272         }
273 
274         // Check if there is a non LF terminated line at the end of the file
275         qint64 file_size = file.size();
276         if ( !*interruptRequest_ && file_size > pos ) {
277             LOG( logWARNING ) <<
278                 "Non LF terminated file, adding a fake end of line";
279 
280             FastLinePositionArray line_position;
281             line_position.append( file_size + 1 );
282             line_position.setFakeFinalLF();
283 
284             indexing_data->addAll( 0, 0, line_position );
285         }
286     }
287     else {
288         // TODO: Check that the file is seekable?
289         // If the file cannot be open, we do as if it was empty
290         LOG(logWARNING) << "Cannot open file " << fileName_.toStdString();
291 
292         emit indexingProgressed( 100 );
293     }
294 }
295 
296 // Called in the worker thread's context
297 bool FullIndexOperation::start()
298 {
299     LOG(logDEBUG) << "FullIndexOperation::start(), file "
300         << fileName_.toStdString();
301 
302     LOG(logDEBUG) << "FullIndexOperation: Starting the count...";
303 
304     emit indexingProgressed( 0 );
305 
306     // First empty the index
307     indexing_data_->clear();
308 
309     doIndex( indexing_data_, 0 );
310 
311     LOG(logDEBUG) << "FullIndexOperation: ... finished counting."
312         "interrupt = " << *interruptRequest_;
313 
314     return ( *interruptRequest_ ? false : true );
315 }
316 
317 bool PartialIndexOperation::start()
318 {
319     LOG(logDEBUG) << "PartialIndexOperation::start(), file "
320         << fileName_.toStdString();
321 
322     LOG(logDEBUG) << "PartialIndexOperation: Starting the count at "
323         << initialPosition_ << " ...";
324 
325     emit indexingProgressed( 0 );
326 
327     doIndex( indexing_data_, initialPosition_ );
328 
329     LOG(logDEBUG) << "PartialIndexOperation: ... finished counting.";
330 
331     return ( *interruptRequest_ ? false : true );
332 }
333