xref: /glogg/src/data/logdataworkerthread.cpp (revision c9a9366412fcda118aaadfe3742db8727a163a0f)
1 /*
2  * Copyright (C) 2009, 2010, 2014, 2015 Nicolas Bonnefon and other contributors
3  *
4  * This file is part of glogg.
5  *
6  * glogg is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * glogg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with glogg.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include <QFile>
21 
22 #include "log.h"
23 
24 #include "logdata.h"
25 #include "logdataworkerthread.h"
26 
27 // Size of the chunk to read (5 MiB)
28 const int IndexOperation::sizeChunk = 5*1024*1024;
29 
30 qint64 IndexingData::getSize() const
31 {
32     QMutexLocker locker( &dataMutex_ );
33 
34     return indexedSize_;
35 }
36 
37 int IndexingData::getMaxLength() const
38 {
39     QMutexLocker locker( &dataMutex_ );
40 
41     return maxLength_;
42 }
43 
44 LineNumber IndexingData::getNbLines() const
45 {
46     QMutexLocker locker( &dataMutex_ );
47 
48     return linePosition_.size();
49 }
50 
51 qint64 IndexingData::getPosForLine( LineNumber line ) const
52 {
53     QMutexLocker locker( &dataMutex_ );
54 
55     return linePosition_.at( line );
56 }
57 
58 EncodingSpeculator::Encoding IndexingData::getEncodingGuess() const
59 {
60     QMutexLocker locker( &dataMutex_ );
61 
62     return encoding_;
63 }
64 
65 void IndexingData::addAll( qint64 size, int length,
66         const FastLinePositionArray& linePosition,
67         EncodingSpeculator::Encoding encoding )
68 
69 {
70     QMutexLocker locker( &dataMutex_ );
71 
72     indexedSize_  += size;
73     maxLength_     = qMax( maxLength_, length );
74     linePosition_.append_list( linePosition );
75 
76     encoding_      = encoding;
77 }
78 
79 void IndexingData::clear()
80 {
81     maxLength_   = 0;
82     indexedSize_ = 0;
83     linePosition_ = LinePositionArray();
84     encoding_    = EncodingSpeculator::Encoding::ASCII7;
85 }
86 
87 LogDataWorkerThread::LogDataWorkerThread( IndexingData* indexing_data )
88     : QThread(), mutex_(), operationRequestedCond_(),
89     nothingToDoCond_(), fileName_(), indexing_data_( indexing_data )
90 {
91     terminate_          = false;
92     interruptRequested_ = false;
93     operationRequested_ = NULL;
94 }
95 
96 LogDataWorkerThread::~LogDataWorkerThread()
97 {
98     {
99         QMutexLocker locker( &mutex_ );
100         terminate_ = true;
101         operationRequestedCond_.wakeAll();
102     }
103     wait();
104 }
105 
106 void LogDataWorkerThread::attachFile( const QString& fileName )
107 {
108     QMutexLocker locker( &mutex_ );  // to protect fileName_
109 
110     fileName_ = fileName;
111 }
112 
113 void LogDataWorkerThread::indexAll()
114 {
115     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
116 
117     LOG(logDEBUG) << "FullIndex requested";
118 
119     // If an operation is ongoing, we will block
120     while ( (operationRequested_ != NULL) )
121         nothingToDoCond_.wait( &mutex_ );
122 
123     interruptRequested_ = false;
124     operationRequested_ = new FullIndexOperation( fileName_,
125             indexing_data_, &interruptRequested_, &encodingSpeculator_ );
126     operationRequestedCond_.wakeAll();
127 }
128 
129 void LogDataWorkerThread::indexAdditionalLines( qint64 position )
130 {
131     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
132 
133     LOG(logDEBUG) << "AddLines requested";
134 
135     // If an operation is ongoing, we will block
136     while ( (operationRequested_ != NULL) )
137         nothingToDoCond_.wait( &mutex_ );
138 
139     interruptRequested_ = false;
140     operationRequested_ = new PartialIndexOperation( fileName_,
141             indexing_data_, &interruptRequested_, &encodingSpeculator_, position );
142     operationRequestedCond_.wakeAll();
143 }
144 
145 void LogDataWorkerThread::interrupt()
146 {
147     LOG(logDEBUG) << "Load interrupt requested";
148 
149     // No mutex here, setting a bool is probably atomic!
150     interruptRequested_ = true;
151 }
152 
153 // This is the thread's main loop
154 void LogDataWorkerThread::run()
155 {
156     QMutexLocker locker( &mutex_ );
157 
158     forever {
159         while ( (terminate_ == false) && (operationRequested_ == NULL) )
160             operationRequestedCond_.wait( &mutex_ );
161         LOG(logDEBUG) << "Worker thread signaled";
162 
163         // Look at what needs to be done
164         if ( terminate_ )
165             return;      // We must die
166 
167         if ( operationRequested_ ) {
168             connect( operationRequested_, SIGNAL( indexingProgressed( int ) ),
169                     this, SIGNAL( indexingProgressed( int ) ) );
170 
171             // Run the operation
172             try {
173                 if ( operationRequested_->start() ) {
174                     LOG(logDEBUG) << "... finished copy in workerThread.";
175                     emit indexingFinished( LoadingStatus::Successful );
176                 }
177                 else {
178                     emit indexingFinished( LoadingStatus::Interrupted );
179                 }
180             }
181             catch ( std::bad_alloc& ba ) {
182                 LOG(logERROR) << "Out of memory whilst indexing!";
183                 emit indexingFinished( LoadingStatus::NoMemory );
184             }
185 
186             delete operationRequested_;
187             operationRequested_ = NULL;
188             nothingToDoCond_.wakeAll();
189         }
190     }
191 }
192 
193 //
194 // Operations implementation
195 //
196 
197 IndexOperation::IndexOperation( const QString& fileName,
198         IndexingData* indexingData, bool* interruptRequest,
199         EncodingSpeculator* encodingSpeculator )
200     : fileName_( fileName )
201 {
202     interruptRequest_ = interruptRequest;
203     indexing_data_ = indexingData;
204     encoding_speculator_ = encodingSpeculator;
205 }
206 
207 PartialIndexOperation::PartialIndexOperation( const QString& fileName,
208         IndexingData* indexingData, bool* interruptRequest,
209         EncodingSpeculator* speculator, qint64 position )
210     : IndexOperation( fileName, indexingData, interruptRequest, speculator )
211 {
212     initialPosition_ = position;
213 }
214 
215 void IndexOperation::doIndex( IndexingData* indexing_data,
216         EncodingSpeculator* encoding_speculator, qint64 initialPosition )
217 {
218     qint64 pos = initialPosition; // Absolute position of the start of current line
219     qint64 end = 0;               // Absolute position of the end of current line
220     int additional_spaces = 0;    // Additional spaces due to tabs
221 
222     QFile file( fileName_ );
223     if ( file.open( QIODevice::ReadOnly ) ) {
224         // Count the number of lines and max length
225         // (read big chunks to speed up reading from disk)
226         file.seek( pos );
227         while ( !file.atEnd() ) {
228             FastLinePositionArray line_positions;
229             int max_length = 0;
230 
231             if ( *interruptRequest_ )   // a bool is always read/written atomically isn't it?
232                 break;
233 
234             // Read a chunk of 5MB
235             const qint64 block_beginning = file.pos();
236             const QByteArray block = file.read( sizeChunk );
237 
238             // Count the number of lines in each chunk
239             qint64 pos_within_block = 0;
240             while ( pos_within_block != -1 ) {
241                 pos_within_block = qMax( pos - block_beginning, 0LL);
242                 // Looking for the next \n, expanding tabs in the process
243                 do {
244                     if ( pos_within_block < block.length() ) {
245                         const char c = block.at(pos_within_block);
246                         encoding_speculator->inject_byte( c );
247                         if ( c == '\n' )
248                             break;
249                         else if ( c == '\t' )
250                             additional_spaces += AbstractLogData::tabStop -
251                                 ( ( ( block_beginning - pos ) + pos_within_block
252                                     + additional_spaces ) % AbstractLogData::tabStop ) - 1;
253 
254                         pos_within_block++;
255                     }
256                     else {
257                         pos_within_block = -1;
258                     }
259                 } while ( pos_within_block != -1 );
260 
261                 // When a end of line has been found...
262                 if ( pos_within_block != -1 ) {
263                     end = pos_within_block + block_beginning;
264                     const int length = end-pos + additional_spaces;
265                     if ( length > max_length )
266                         max_length = length;
267                     pos = end + 1;
268                     additional_spaces = 0;
269                     line_positions.append( pos );
270                 }
271             }
272 
273             // Update the shared data
274             indexing_data->addAll( block.length(), max_length, line_positions,
275                    encoding_speculator->guess() );
276 
277             // Update the caller for progress indication
278             int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100;
279             emit indexingProgressed( progress );
280         }
281 
282         // Check if there is a non LF terminated line at the end of the file
283         qint64 file_size = file.size();
284         if ( !*interruptRequest_ && file_size > pos ) {
285             LOG( logWARNING ) <<
286                 "Non LF terminated file, adding a fake end of line";
287 
288             FastLinePositionArray line_position;
289             line_position.append( file_size + 1 );
290             line_position.setFakeFinalLF();
291 
292             indexing_data->addAll( 0, 0, line_position, encoding_speculator->guess() );
293         }
294     }
295     else {
296         // TODO: Check that the file is seekable?
297         // If the file cannot be open, we do as if it was empty
298         LOG(logWARNING) << "Cannot open file " << fileName_.toStdString();
299 
300         emit indexingProgressed( 100 );
301     }
302 }
303 
304 // Called in the worker thread's context
305 bool FullIndexOperation::start()
306 {
307     LOG(logDEBUG) << "FullIndexOperation::start(), file "
308         << fileName_.toStdString();
309 
310     LOG(logDEBUG) << "FullIndexOperation: Starting the count...";
311 
312     emit indexingProgressed( 0 );
313 
314     // First empty the index
315     indexing_data_->clear();
316 
317     doIndex( indexing_data_, encoding_speculator_, 0 );
318 
319     LOG(logDEBUG) << "FullIndexOperation: ... finished counting."
320         "interrupt = " << *interruptRequest_;
321 
322     return ( *interruptRequest_ ? false : true );
323 }
324 
325 bool PartialIndexOperation::start()
326 {
327     LOG(logDEBUG) << "PartialIndexOperation::start(), file "
328         << fileName_.toStdString();
329 
330     LOG(logDEBUG) << "PartialIndexOperation: Starting the count at "
331         << initialPosition_ << " ...";
332 
333     emit indexingProgressed( 0 );
334 
335     doIndex( indexing_data_, encoding_speculator_, initialPosition_ );
336 
337     LOG(logDEBUG) << "PartialIndexOperation: ... finished counting.";
338 
339     return ( *interruptRequest_ ? false : true );
340 }
341