xref: /glogg/src/data/logdataworkerthread.cpp (revision af5438c7ece65e2f88fe509fd5c31d9c7260457e)
1 /*
2  * Copyright (C) 2009, 2010, 2014 Nicolas Bonnefon and other contributors
3  *
4  * This file is part of glogg.
5  *
6  * glogg is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * glogg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with glogg.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include <QFile>
21 
22 #include "log.h"
23 
24 #include "logdata.h"
25 #include "logdataworkerthread.h"
26 
27 // Size of the chunk to read (5 MiB)
28 const int IndexOperation::sizeChunk = 5*1024*1024;
29 
30 void IndexingData::getAll( qint64* size, int* length,
31         LinePositionArray* linePosition )
32 {
33     QMutexLocker locker( &dataMutex_ );
34 
35     *size         = indexedSize_;
36     *length       = maxLength_;
37     // FIXME: slooooooooow
38     *linePosition = linePosition_;
39 }
40 
41 void IndexingData::setAll( qint64 size, int length,
42         LinePositionArray& linePosition )
43 {
44     QMutexLocker locker( &dataMutex_ );
45 
46     indexedSize_  = size;
47     maxLength_    = length;
48     linePosition_ = std::move( linePosition );
49 }
50 
51 void IndexingData::addAll( qint64 size, int length,
52         const LinePositionArray& linePosition )
53 {
54     QMutexLocker locker( &dataMutex_ );
55 
56     indexedSize_  += size;
57     maxLength_     = qMax( maxLength_, length );
58     linePosition_.append_list( linePosition );
59 }
60 
61 LogDataWorkerThread::LogDataWorkerThread()
62     : QThread(), mutex_(), operationRequestedCond_(),
63     nothingToDoCond_(), fileName_(), indexingData_()
64 {
65     terminate_          = false;
66     interruptRequested_ = false;
67     operationRequested_ = NULL;
68 }
69 
70 LogDataWorkerThread::~LogDataWorkerThread()
71 {
72     {
73         QMutexLocker locker( &mutex_ );
74         terminate_ = true;
75         operationRequestedCond_.wakeAll();
76     }
77     wait();
78 }
79 
80 void LogDataWorkerThread::attachFile( const QString& fileName )
81 {
82     QMutexLocker locker( &mutex_ );  // to protect fileName_
83 
84     fileName_ = fileName;
85 }
86 
87 void LogDataWorkerThread::indexAll()
88 {
89     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
90 
91     LOG(logDEBUG) << "FullIndex requested";
92 
93     // If an operation is ongoing, we will block
94     while ( (operationRequested_ != NULL) )
95         nothingToDoCond_.wait( &mutex_ );
96 
97     interruptRequested_ = false;
98     operationRequested_ = new FullIndexOperation( fileName_, &interruptRequested_ );
99     operationRequestedCond_.wakeAll();
100 }
101 
102 void LogDataWorkerThread::indexAdditionalLines( qint64 position )
103 {
104     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
105 
106     LOG(logDEBUG) << "AddLines requested";
107 
108     // If an operation is ongoing, we will block
109     while ( (operationRequested_ != NULL) )
110         nothingToDoCond_.wait( &mutex_ );
111 
112     interruptRequested_ = false;
113     operationRequested_ = new PartialIndexOperation( fileName_, &interruptRequested_, position );
114     operationRequestedCond_.wakeAll();
115 }
116 
117 void LogDataWorkerThread::interrupt()
118 {
119     LOG(logDEBUG) << "Load interrupt requested";
120 
121     // No mutex here, setting a bool is probably atomic!
122     interruptRequested_ = true;
123 }
124 
125 // This will do an atomic copy of the object
126 // (hopefully fast as we use Qt containers)
127 void LogDataWorkerThread::getIndexingData(
128         qint64* indexedSize, int* maxLength, LinePositionArray* linePosition )
129 {
130     indexingData_.getAll( indexedSize, maxLength, linePosition );
131 }
132 
133 // This is the thread's main loop
134 void LogDataWorkerThread::run()
135 {
136     QMutexLocker locker( &mutex_ );
137 
138     forever {
139         while ( (terminate_ == false) && (operationRequested_ == NULL) )
140             operationRequestedCond_.wait( &mutex_ );
141         LOG(logDEBUG) << "Worker thread signaled";
142 
143         // Look at what needs to be done
144         if ( terminate_ )
145             return;      // We must die
146 
147         if ( operationRequested_ ) {
148             connect( operationRequested_, SIGNAL( indexingProgressed( int ) ),
149                     this, SIGNAL( indexingProgressed( int ) ) );
150 
151             // Run the operation
152             try {
153                 if ( operationRequested_->start( indexingData_ ) ) {
154                     LOG(logDEBUG) << "... finished copy in workerThread.";
155                     emit indexingFinished( LoadingStatus::Successful );
156                 }
157                 else {
158                     emit indexingFinished( LoadingStatus::Interrupted );
159                 }
160             }
161             catch ( std::bad_alloc& ba ) {
162                 LOG(logERROR) << "Out of memory whilst indexing!";
163                 emit indexingFinished( LoadingStatus::NoMemory );
164             }
165 
166             delete operationRequested_;
167             operationRequested_ = NULL;
168             nothingToDoCond_.wakeAll();
169         }
170     }
171 }
172 
173 //
174 // Operations implementation
175 //
176 
177 IndexOperation::IndexOperation( QString& fileName, bool* interruptRequest )
178     : fileName_( fileName )
179 {
180     interruptRequest_ = interruptRequest;
181 }
182 
183 PartialIndexOperation::PartialIndexOperation( QString& fileName,
184         bool* interruptRequest, qint64 position )
185     : IndexOperation( fileName, interruptRequest )
186 {
187     initialPosition_ = position;
188 }
189 
190 qint64 IndexOperation::doIndex( LinePositionArray& linePosition, int* maxLength,
191         qint64 initialPosition )
192 {
193     int max_length = *maxLength;
194     qint64 pos = initialPosition; // Absolute position of the start of current line
195     qint64 end = 0;               // Absolute position of the end of current line
196     int additional_spaces = 0;    // Additional spaces due to tabs
197 
198     QFile file( fileName_ );
199     if ( file.open( QIODevice::ReadOnly ) ) {
200         // Count the number of lines and max length
201         // (read big chunks to speed up reading from disk)
202         file.seek( pos );
203         while ( !file.atEnd() ) {
204             if ( *interruptRequest_ )   // a bool is always read/written atomically isn't it?
205                 break;
206 
207             // Read a chunk of 5MB
208             const qint64 block_beginning = file.pos();
209             const QByteArray block = file.read( sizeChunk );
210 
211             // Count the number of lines in each chunk
212             qint64 pos_within_block = 0;
213             while ( pos_within_block != -1 ) {
214                 pos_within_block = qMax( pos - block_beginning, 0LL);
215                 // Looking for the next \n, expanding tabs in the process
216                 do {
217                     if ( pos_within_block < block.length() ) {
218                         const char c = block.at(pos_within_block);
219                         if ( c == '\n' )
220                             break;
221                         else if ( c == '\t' )
222                             additional_spaces += AbstractLogData::tabStop -
223                                 ( ( ( block_beginning - pos ) + pos_within_block
224                                     + additional_spaces ) % AbstractLogData::tabStop ) - 1;
225 
226                         pos_within_block++;
227                     }
228                     else {
229                         pos_within_block = -1;
230                     }
231                 } while ( pos_within_block != -1 );
232 
233                 // When a end of line has been found...
234                 if ( pos_within_block != -1 ) {
235                     end = pos_within_block + block_beginning;
236                     const int length = end-pos + additional_spaces;
237                     if ( length > max_length )
238                         max_length = length;
239                     pos = end + 1;
240                     additional_spaces = 0;
241                     linePosition.append( pos );
242                 }
243             }
244 
245             // Update the caller for progress indication
246             int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100;
247             emit indexingProgressed( progress );
248         }
249 
250         // Check if there is a non LF terminated line at the end of the file
251         if ( file.size() > pos ) {
252             LOG( logWARNING ) <<
253                 "Non LF terminated file, adding a fake end of line";
254             linePosition.append( file.size() + 1 );
255             linePosition.setFakeFinalLF();
256         }
257     }
258     else {
259         // TODO: Check that the file is seekable?
260         // If the file cannot be open, we do as if it was empty
261         LOG(logWARNING) << "Cannot open file " << fileName_.toStdString();
262 
263         emit indexingProgressed( 100 );
264     }
265 
266     *maxLength = max_length;
267 
268     return file.size();
269 }
270 
271 // Called in the worker thread's context
272 // Should not use any shared variable
273 bool FullIndexOperation::start( IndexingData& sharedData )
274 {
275     LOG(logDEBUG) << "FullIndexOperation::start(), file "
276         << fileName_.toStdString();
277 
278     LOG(logDEBUG) << "FullIndexOperation: Starting the count...";
279     int maxLength = 0;
280     LinePositionArray linePosition = {};
281 
282     emit indexingProgressed( 0 );
283 
284     qint64 size = doIndex( linePosition, &maxLength, 0 );
285 
286     if ( *interruptRequest_ == false )
287     {
288         // Commit the results to the shared data (atomically)
289         sharedData.setAll( size, maxLength, linePosition );
290     }
291 
292     LOG(logDEBUG) << "FullIndexOperation: ... finished counting."
293         "interrupt = " << *interruptRequest_;
294 
295     return ( *interruptRequest_ ? false : true );
296 }
297 
298 bool PartialIndexOperation::start( IndexingData& sharedData )
299 {
300     LOG(logDEBUG) << "PartialIndexOperation::start(), file "
301         << fileName_.toStdString();
302 
303     LOG(logDEBUG) << "PartialIndexOperation: Starting the count at "
304         << initialPosition_ << " ...";
305     int maxLength = 0;
306     LinePositionArray linePosition = {};
307 
308     emit indexingProgressed( 0 );
309 
310     qint64 size = doIndex( linePosition, &maxLength, initialPosition_ );
311 
312     if ( *interruptRequest_ == false )
313     {
314         // Commit the results to the shared data (atomically)
315         sharedData.addAll( size - initialPosition_, maxLength, linePosition );
316     }
317 
318     LOG(logDEBUG) << "PartialIndexOperation: ... finished counting.";
319 
320     return ( *interruptRequest_ ? false : true );
321 }
322