xref: /glogg/src/data/logdataworkerthread.cpp (revision 821cac888d515a4e41b5d4ba4130c56db4463501)
1 /*
2  * Copyright (C) 2009, 2010 Nicolas Bonnefon and other contributors
3  *
4  * This file is part of glogg.
5  *
6  * glogg is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * glogg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with glogg.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include <QFile>
21 
22 #include "log.h"
23 
24 #include "logdata.h"
25 #include "logdataworkerthread.h"
26 
27 // Size of the chunk to read (5 MiB)
28 const int IndexOperation::sizeChunk = 5*1024*1024;
29 
30 void IndexingData::getAll( qint64* size, int* length,
31         LinePositionArray* linePosition )
32 {
33     QMutexLocker locker( &dataMutex_ );
34 
35     *size         = indexedSize_;
36     *length       = maxLength_;
37     *linePosition = linePosition_;
38 }
39 
40 void IndexingData::setAll( qint64 size, int length,
41         const LinePositionArray& linePosition )
42 {
43     QMutexLocker locker( &dataMutex_ );
44 
45     indexedSize_  = size;
46     maxLength_    = length;
47     linePosition_ = linePosition;
48 }
49 
50 void IndexingData::addAll( qint64 size, int length,
51         const LinePositionArray& linePosition )
52 {
53     QMutexLocker locker( &dataMutex_ );
54 
55     indexedSize_  += size;
56     maxLength_     = qMax( maxLength_, length );
57     linePosition_ += linePosition;
58 }
59 
60 LogDataWorkerThread::LogDataWorkerThread()
61     : QThread(), mutex_(), operationRequestedCond_(),
62     nothingToDoCond_(), fileName_(), indexingData_()
63 {
64     terminate_          = false;
65     interruptRequested_ = false;
66     operationRequested_ = NULL;
67 }
68 
69 LogDataWorkerThread::~LogDataWorkerThread()
70 {
71     {
72         QMutexLocker locker( &mutex_ );
73         terminate_ = true;
74         operationRequestedCond_.wakeAll();
75     }
76     wait();
77 }
78 
79 void LogDataWorkerThread::attachFile( const QString& fileName )
80 {
81     QMutexLocker locker( &mutex_ );  // to protect fileName_
82 
83     fileName_ = fileName;
84 }
85 
86 void LogDataWorkerThread::indexAll()
87 {
88     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
89 
90     LOG(logDEBUG) << "FullIndex requested";
91 
92     // If an operation is ongoing, we will block
93     while ( (operationRequested_ != NULL) )
94         nothingToDoCond_.wait( &mutex_ );
95 
96     interruptRequested_ = false;
97     operationRequested_ = new FullIndexOperation( fileName_, &interruptRequested_ );
98     operationRequestedCond_.wakeAll();
99 }
100 
101 void LogDataWorkerThread::indexAdditionalLines( qint64 position )
102 {
103     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
104 
105     LOG(logDEBUG) << "AddLines requested";
106 
107     // If an operation is ongoing, we will block
108     while ( (operationRequested_ != NULL) )
109         nothingToDoCond_.wait( &mutex_ );
110 
111     interruptRequested_ = false;
112     operationRequested_ = new PartialIndexOperation( fileName_, &interruptRequested_, position );
113     operationRequestedCond_.wakeAll();
114 }
115 
116 void LogDataWorkerThread::interrupt()
117 {
118     LOG(logDEBUG) << "Load interrupt requested";
119 
120     // No mutex here, setting a bool is probably atomic!
121     interruptRequested_ = true;
122 }
123 
124 // This will do an atomic copy of the object
125 // (hopefully fast as we use Qt containers)
126 void LogDataWorkerThread::getIndexingData(
127         qint64* indexedSize, int* maxLength, LinePositionArray* linePosition )
128 {
129     indexingData_.getAll( indexedSize, maxLength, linePosition );
130 }
131 
132 // This is the thread's main loop
133 void LogDataWorkerThread::run()
134 {
135     QMutexLocker locker( &mutex_ );
136 
137     forever {
138         while ( (terminate_ == false) && (operationRequested_ == NULL) )
139             operationRequestedCond_.wait( &mutex_ );
140         LOG(logDEBUG) << "Worker thread signaled";
141 
142         // Look at what needs to be done
143         if ( terminate_ )
144             return;      // We must die
145 
146         if ( operationRequested_ ) {
147             connect( operationRequested_, SIGNAL( indexingProgressed( int ) ),
148                     this, SIGNAL( indexingProgressed( int ) ) );
149 
150             // Run the operation
151             if ( operationRequested_->start( indexingData_ ) ) {
152                 LOG(logDEBUG) << "... finished copy in workerThread.";
153                 emit indexingFinished( true );
154             }
155             else {
156                 emit indexingFinished( false );
157             }
158 
159             delete operationRequested_;
160             operationRequested_ = NULL;
161             nothingToDoCond_.wakeAll();
162         }
163     }
164 }
165 
166 //
167 // Operations implementation
168 //
169 
170 IndexOperation::IndexOperation( QString& fileName, bool* interruptRequest )
171     : fileName_( fileName )
172 {
173     interruptRequest_ = interruptRequest;
174 }
175 
176 PartialIndexOperation::PartialIndexOperation( QString& fileName,
177         bool* interruptRequest, qint64 position )
178     : IndexOperation( fileName, interruptRequest )
179 {
180     initialPosition_ = position;
181 }
182 
183 qint64 IndexOperation::doIndex( LinePositionArray& linePosition, int* maxLength,
184         qint64 initialPosition )
185 {
186     int max_length = *maxLength;
187     qint64 pos = initialPosition; // Absolute position of the start of current line
188     qint64 end = 0;               // Absolute position of the end of current line
189     int additional_spaces = 0;    // Additional spaces due to tabs
190 
191     QFile file( fileName_ );
192     if ( file.open( QIODevice::ReadOnly ) ) {
193         // Count the number of lines and max length
194         // (read big chunks to speed up reading from disk)
195         file.seek( pos );
196         while ( !file.atEnd() ) {
197             if ( *interruptRequest_ )   // a bool is always read/written atomically isn't it?
198                 break;
199 
200             // Read a chunk of 5MB
201             const qint64 block_beginning = file.pos();
202             const QByteArray block = file.read( sizeChunk );
203 
204             // Count the number of lines in each chunk
205             qint64 pos_within_block = 0;
206             while ( pos_within_block != -1 ) {
207                 pos_within_block = qMax( pos - block_beginning, 0LL);
208                 // Looking for the next \n, expanding tabs in the process
209                 do {
210                     if ( pos_within_block < block.length() ) {
211                         const char c = block.at(pos_within_block);
212                         if ( c == '\n' )
213                             break;
214                         else if ( c == '\t' )
215                             additional_spaces += AbstractLogData::tabStop -
216                                 ( ( ( block_beginning - pos ) + pos_within_block
217                                     + additional_spaces ) % AbstractLogData::tabStop ) - 1;
218 
219                         pos_within_block++;
220                     }
221                     else {
222                         pos_within_block = -1;
223                     }
224                 } while ( pos_within_block != -1 );
225 
226                 // When a end of line has been found...
227                 if ( pos_within_block != -1 ) {
228                     end = pos_within_block + block_beginning;
229                     const int length = end-pos + additional_spaces;
230                     if ( length > max_length )
231                         max_length = length;
232                     pos = end + 1;
233                     additional_spaces = 0;
234                     linePosition.append( pos );
235                 }
236             }
237 
238             // Update the caller for progress indication
239             int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100;
240             emit indexingProgressed( progress );
241         }
242 
243         // Check if there is a non LF terminated line at the end of the file
244         if ( file.size() > pos ) {
245             LOG( logWARNING ) <<
246                 "Non LF terminated file, adding a fake end of line";
247             linePosition.append( file.size() + 1 );
248             linePosition.setFakeFinalLF();
249         }
250     }
251     else {
252         // TODO: Check that the file is seekable?
253         // If the file cannot be open, we do as if it was empty
254         LOG(logWARNING) << "Cannot open file " << fileName_.toStdString();
255 
256         emit indexingProgressed( 100 );
257     }
258 
259     *maxLength = max_length;
260 
261     return file.size();
262 }
263 
264 // Called in the worker thread's context
265 // Should not use any shared variable
266 bool FullIndexOperation::start( IndexingData& sharedData )
267 {
268     LOG(logDEBUG) << "FullIndexOperation::start(), file "
269         << fileName_.toStdString();
270 
271     LOG(logDEBUG) << "FullIndexOperation: Starting the count...";
272     int maxLength = 0;
273     LinePositionArray linePosition = LinePositionArray();
274 
275     emit indexingProgressed( 0 );
276 
277     qint64 size = doIndex( linePosition, &maxLength, 0 );
278 
279     if ( *interruptRequest_ == false )
280     {
281         // Commit the results to the shared data (atomically)
282         sharedData.setAll( size, maxLength, linePosition );
283     }
284 
285     LOG(logDEBUG) << "FullIndexOperation: ... finished counting."
286         "interrupt = " << *interruptRequest_;
287 
288     return ( *interruptRequest_ ? false : true );
289 }
290 
291 bool PartialIndexOperation::start( IndexingData& sharedData )
292 {
293     LOG(logDEBUG) << "PartialIndexOperation::start(), file "
294         << fileName_.toStdString();
295 
296     LOG(logDEBUG) << "PartialIndexOperation: Starting the count at "
297         << initialPosition_ << " ...";
298     int maxLength = 0;
299     LinePositionArray linePosition = LinePositionArray();
300 
301     emit indexingProgressed( 0 );
302 
303     qint64 size = doIndex( linePosition, &maxLength, initialPosition_ );
304 
305     if ( *interruptRequest_ == false )
306     {
307         // Commit the results to the shared data (atomically)
308         sharedData.addAll( size - initialPosition_, maxLength, linePosition );
309     }
310 
311     LOG(logDEBUG) << "PartialIndexOperation: ... finished counting.";
312 
313     return ( *interruptRequest_ ? false : true );
314 }
315