xref: /glogg/src/data/logdataworkerthread.cpp (revision 84af0c9b75fb9369ab66df476dd881cd6d30efcf)
1 /*
2  * Copyright (C) 2009, 2010, 2014 Nicolas Bonnefon and other contributors
3  *
4  * This file is part of glogg.
5  *
6  * glogg is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * glogg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with glogg.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include <QFile>
21 
22 #include "log.h"
23 
24 #include "logdata.h"
25 #include "logdataworkerthread.h"
26 
27 // Size of the chunk to read (5 MiB)
28 const int IndexOperation::sizeChunk = 5*1024*1024;
29 
30 void IndexingData::getAll( qint64* size, int* length,
31         LinePositionArray* linePosition )
32 {
33     QMutexLocker locker( &dataMutex_ );
34 
35     *size         = indexedSize_;
36     *length       = maxLength_;
37     *linePosition = linePosition_;
38 }
39 
40 void IndexingData::setAll( qint64 size, int length,
41         const LinePositionArray& linePosition )
42 {
43     QMutexLocker locker( &dataMutex_ );
44 
45     indexedSize_  = size;
46     maxLength_    = length;
47     linePosition_ = linePosition;
48 }
49 
50 void IndexingData::addAll( qint64 size, int length,
51         const LinePositionArray& linePosition )
52 {
53     QMutexLocker locker( &dataMutex_ );
54 
55     indexedSize_  += size;
56     maxLength_     = qMax( maxLength_, length );
57     linePosition_ += linePosition;
58 }
59 
60 LogDataWorkerThread::LogDataWorkerThread()
61     : QThread(), mutex_(), operationRequestedCond_(),
62     nothingToDoCond_(), fileName_(), indexingData_()
63 {
64     terminate_          = false;
65     interruptRequested_ = false;
66     operationRequested_ = NULL;
67 }
68 
69 LogDataWorkerThread::~LogDataWorkerThread()
70 {
71     {
72         QMutexLocker locker( &mutex_ );
73         terminate_ = true;
74         operationRequestedCond_.wakeAll();
75     }
76     wait();
77 }
78 
79 void LogDataWorkerThread::attachFile( const QString& fileName )
80 {
81     QMutexLocker locker( &mutex_ );  // to protect fileName_
82 
83     fileName_ = fileName;
84 }
85 
86 void LogDataWorkerThread::indexAll()
87 {
88     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
89 
90     LOG(logDEBUG) << "FullIndex requested";
91 
92     // If an operation is ongoing, we will block
93     while ( (operationRequested_ != NULL) )
94         nothingToDoCond_.wait( &mutex_ );
95 
96     interruptRequested_ = false;
97     operationRequested_ = new FullIndexOperation( fileName_, &interruptRequested_ );
98     operationRequestedCond_.wakeAll();
99 }
100 
101 void LogDataWorkerThread::indexAdditionalLines( qint64 position )
102 {
103     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
104 
105     LOG(logDEBUG) << "AddLines requested";
106 
107     // If an operation is ongoing, we will block
108     while ( (operationRequested_ != NULL) )
109         nothingToDoCond_.wait( &mutex_ );
110 
111     interruptRequested_ = false;
112     operationRequested_ = new PartialIndexOperation( fileName_, &interruptRequested_, position );
113     operationRequestedCond_.wakeAll();
114 }
115 
116 void LogDataWorkerThread::interrupt()
117 {
118     LOG(logDEBUG) << "Load interrupt requested";
119 
120     // No mutex here, setting a bool is probably atomic!
121     interruptRequested_ = true;
122 }
123 
124 // This will do an atomic copy of the object
125 // (hopefully fast as we use Qt containers)
126 void LogDataWorkerThread::getIndexingData(
127         qint64* indexedSize, int* maxLength, LinePositionArray* linePosition )
128 {
129     indexingData_.getAll( indexedSize, maxLength, linePosition );
130 }
131 
132 // This is the thread's main loop
133 void LogDataWorkerThread::run()
134 {
135     QMutexLocker locker( &mutex_ );
136 
137     forever {
138         while ( (terminate_ == false) && (operationRequested_ == NULL) )
139             operationRequestedCond_.wait( &mutex_ );
140         LOG(logDEBUG) << "Worker thread signaled";
141 
142         // Look at what needs to be done
143         if ( terminate_ )
144             return;      // We must die
145 
146         if ( operationRequested_ ) {
147             connect( operationRequested_, SIGNAL( indexingProgressed( int ) ),
148                     this, SIGNAL( indexingProgressed( int ) ) );
149 
150             // Run the operation
151             try {
152                 if ( operationRequested_->start( indexingData_ ) ) {
153                     LOG(logDEBUG) << "... finished copy in workerThread.";
154                     emit indexingFinished( LoadingStatus::Successful );
155                 }
156                 else {
157                     emit indexingFinished( LoadingStatus::Interrupted );
158                 }
159             }
160             catch ( std::bad_alloc& ba ) {
161                 LOG(logERROR) << "Out of memory whilst indexing!";
162                 emit indexingFinished( LoadingStatus::NoMemory );
163             }
164 
165             delete operationRequested_;
166             operationRequested_ = NULL;
167             nothingToDoCond_.wakeAll();
168         }
169     }
170 }
171 
172 //
173 // Operations implementation
174 //
175 
176 IndexOperation::IndexOperation( QString& fileName, bool* interruptRequest )
177     : fileName_( fileName )
178 {
179     interruptRequest_ = interruptRequest;
180 }
181 
182 PartialIndexOperation::PartialIndexOperation( QString& fileName,
183         bool* interruptRequest, qint64 position )
184     : IndexOperation( fileName, interruptRequest )
185 {
186     initialPosition_ = position;
187 }
188 
189 qint64 IndexOperation::doIndex( LinePositionArray& linePosition, int* maxLength,
190         qint64 initialPosition )
191 {
192     int max_length = *maxLength;
193     qint64 pos = initialPosition; // Absolute position of the start of current line
194     qint64 end = 0;               // Absolute position of the end of current line
195     int additional_spaces = 0;    // Additional spaces due to tabs
196 
197     QFile file( fileName_ );
198     if ( file.open( QIODevice::ReadOnly ) ) {
199         // Count the number of lines and max length
200         // (read big chunks to speed up reading from disk)
201         file.seek( pos );
202         while ( !file.atEnd() ) {
203             if ( *interruptRequest_ )   // a bool is always read/written atomically isn't it?
204                 break;
205 
206             // Read a chunk of 5MB
207             const qint64 block_beginning = file.pos();
208             const QByteArray block = file.read( sizeChunk );
209 
210             // Count the number of lines in each chunk
211             qint64 pos_within_block = 0;
212             while ( pos_within_block != -1 ) {
213                 pos_within_block = qMax( pos - block_beginning, 0LL);
214                 // Looking for the next \n, expanding tabs in the process
215                 do {
216                     if ( pos_within_block < block.length() ) {
217                         const char c = block.at(pos_within_block);
218                         if ( c == '\n' )
219                             break;
220                         else if ( c == '\t' )
221                             additional_spaces += AbstractLogData::tabStop -
222                                 ( ( ( block_beginning - pos ) + pos_within_block
223                                     + additional_spaces ) % AbstractLogData::tabStop ) - 1;
224 
225                         pos_within_block++;
226                     }
227                     else {
228                         pos_within_block = -1;
229                     }
230                 } while ( pos_within_block != -1 );
231 
232                 // When a end of line has been found...
233                 if ( pos_within_block != -1 ) {
234                     end = pos_within_block + block_beginning;
235                     const int length = end-pos + additional_spaces;
236                     if ( length > max_length )
237                         max_length = length;
238                     pos = end + 1;
239                     additional_spaces = 0;
240                     linePosition.append( pos );
241                 }
242             }
243 
244             // Update the caller for progress indication
245             int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100;
246             emit indexingProgressed( progress );
247         }
248 
249         // Check if there is a non LF terminated line at the end of the file
250         if ( file.size() > pos ) {
251             LOG( logWARNING ) <<
252                 "Non LF terminated file, adding a fake end of line";
253             linePosition.append( file.size() + 1 );
254             linePosition.setFakeFinalLF();
255         }
256     }
257     else {
258         // TODO: Check that the file is seekable?
259         // If the file cannot be open, we do as if it was empty
260         LOG(logWARNING) << "Cannot open file " << fileName_.toStdString();
261 
262         emit indexingProgressed( 100 );
263     }
264 
265     *maxLength = max_length;
266 
267     return file.size();
268 }
269 
270 // Called in the worker thread's context
271 // Should not use any shared variable
272 bool FullIndexOperation::start( IndexingData& sharedData )
273 {
274     LOG(logDEBUG) << "FullIndexOperation::start(), file "
275         << fileName_.toStdString();
276 
277     LOG(logDEBUG) << "FullIndexOperation: Starting the count...";
278     int maxLength = 0;
279     LinePositionArray linePosition = LinePositionArray();
280 
281     emit indexingProgressed( 0 );
282 
283     qint64 size = doIndex( linePosition, &maxLength, 0 );
284 
285     if ( *interruptRequest_ == false )
286     {
287         // Commit the results to the shared data (atomically)
288         sharedData.setAll( size, maxLength, linePosition );
289     }
290 
291     LOG(logDEBUG) << "FullIndexOperation: ... finished counting."
292         "interrupt = " << *interruptRequest_;
293 
294     return ( *interruptRequest_ ? false : true );
295 }
296 
297 bool PartialIndexOperation::start( IndexingData& sharedData )
298 {
299     LOG(logDEBUG) << "PartialIndexOperation::start(), file "
300         << fileName_.toStdString();
301 
302     LOG(logDEBUG) << "PartialIndexOperation: Starting the count at "
303         << initialPosition_ << " ...";
304     int maxLength = 0;
305     LinePositionArray linePosition = LinePositionArray();
306 
307     emit indexingProgressed( 0 );
308 
309     qint64 size = doIndex( linePosition, &maxLength, initialPosition_ );
310 
311     if ( *interruptRequest_ == false )
312     {
313         // Commit the results to the shared data (atomically)
314         sharedData.addAll( size - initialPosition_, maxLength, linePosition );
315     }
316 
317     LOG(logDEBUG) << "PartialIndexOperation: ... finished counting.";
318 
319     return ( *interruptRequest_ ? false : true );
320 }
321