xref: /glogg/src/data/logdataworkerthread.cpp (revision 6e2e573c451ec24d720c967fab68538eaa3f3ab5)
1 /*
2  * Copyright (C) 2009, 2010, 2014, 2015 Nicolas Bonnefon and other contributors
3  *
4  * This file is part of glogg.
5  *
6  * glogg is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * glogg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with glogg.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include <QFile>
21 
22 #include "log.h"
23 
24 #include "logdata.h"
25 #include "logdataworkerthread.h"
26 
27 // Size of the chunk to read (5 MiB)
28 const int IndexOperation::sizeChunk = 5*1024*1024;
29 
30 qint64 IndexingData::getSize() const
31 {
32     QMutexLocker locker( &dataMutex_ );
33 
34     return indexedSize_;
35 }
36 
37 int IndexingData::getMaxLength() const
38 {
39     QMutexLocker locker( &dataMutex_ );
40 
41     return maxLength_;
42 }
43 
44 LineNumber IndexingData::getNbLines() const
45 {
46     QMutexLocker locker( &dataMutex_ );
47 
48     return linePosition_.size();
49 }
50 
51 qint64 IndexingData::getPosForLine( LineNumber line ) const
52 {
53     QMutexLocker locker( &dataMutex_ );
54 
55     return linePosition_.at( line );
56 }
57 
58 void IndexingData::addAll( qint64 size, int length,
59         const FastLinePositionArray& linePosition )
60 {
61     QMutexLocker locker( &dataMutex_ );
62 
63     indexedSize_  += size;
64     maxLength_     = qMax( maxLength_, length );
65     linePosition_.append_list( linePosition );
66 }
67 
68 void IndexingData::clear()
69 {
70     maxLength_   = 0;
71     indexedSize_ = 0;
72     linePosition_ = LinePositionArray();
73 }
74 
75 LogDataWorkerThread::LogDataWorkerThread( IndexingData* indexing_data )
76     : QThread(), mutex_(), operationRequestedCond_(),
77     nothingToDoCond_(), fileName_(), indexing_data_( indexing_data )
78 {
79     terminate_          = false;
80     interruptRequested_ = false;
81     operationRequested_ = NULL;
82 }
83 
84 LogDataWorkerThread::~LogDataWorkerThread()
85 {
86     {
87         QMutexLocker locker( &mutex_ );
88         terminate_ = true;
89         operationRequestedCond_.wakeAll();
90     }
91     wait();
92 }
93 
94 void LogDataWorkerThread::attachFile( const QString& fileName )
95 {
96     QMutexLocker locker( &mutex_ );  // to protect fileName_
97 
98     fileName_ = fileName;
99 }
100 
101 void LogDataWorkerThread::indexAll()
102 {
103     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
104 
105     LOG(logDEBUG) << "FullIndex requested";
106 
107     // If an operation is ongoing, we will block
108     while ( (operationRequested_ != NULL) )
109         nothingToDoCond_.wait( &mutex_ );
110 
111     interruptRequested_ = false;
112     operationRequested_ = new FullIndexOperation( fileName_,
113             indexing_data_, &interruptRequested_ );
114     operationRequestedCond_.wakeAll();
115 }
116 
117 void LogDataWorkerThread::indexAdditionalLines( qint64 position )
118 {
119     QMutexLocker locker( &mutex_ );  // to protect operationRequested_
120 
121     LOG(logDEBUG) << "AddLines requested";
122 
123     // If an operation is ongoing, we will block
124     while ( (operationRequested_ != NULL) )
125         nothingToDoCond_.wait( &mutex_ );
126 
127     interruptRequested_ = false;
128     operationRequested_ = new PartialIndexOperation( fileName_,
129             indexing_data_, &interruptRequested_, position );
130     operationRequestedCond_.wakeAll();
131 }
132 
133 void LogDataWorkerThread::interrupt()
134 {
135     LOG(logDEBUG) << "Load interrupt requested";
136 
137     // No mutex here, setting a bool is probably atomic!
138     interruptRequested_ = true;
139 }
140 
141 // This is the thread's main loop
142 void LogDataWorkerThread::run()
143 {
144     QMutexLocker locker( &mutex_ );
145 
146     forever {
147         while ( (terminate_ == false) && (operationRequested_ == NULL) )
148             operationRequestedCond_.wait( &mutex_ );
149         LOG(logDEBUG) << "Worker thread signaled";
150 
151         // Look at what needs to be done
152         if ( terminate_ )
153             return;      // We must die
154 
155         if ( operationRequested_ ) {
156             connect( operationRequested_, SIGNAL( indexingProgressed( int ) ),
157                     this, SIGNAL( indexingProgressed( int ) ) );
158 
159             // Run the operation
160             try {
161                 if ( operationRequested_->start() ) {
162                     LOG(logDEBUG) << "... finished copy in workerThread.";
163                     emit indexingFinished( LoadingStatus::Successful );
164                 }
165                 else {
166                     emit indexingFinished( LoadingStatus::Interrupted );
167                 }
168             }
169             catch ( std::bad_alloc& ba ) {
170                 LOG(logERROR) << "Out of memory whilst indexing!";
171                 emit indexingFinished( LoadingStatus::NoMemory );
172             }
173 
174             delete operationRequested_;
175             operationRequested_ = NULL;
176             nothingToDoCond_.wakeAll();
177         }
178     }
179 }
180 
181 //
182 // Operations implementation
183 //
184 
185 IndexOperation::IndexOperation( const QString& fileName,
186        IndexingData* indexingData, bool* interruptRequest )
187     : fileName_( fileName )
188 {
189     interruptRequest_ = interruptRequest;
190     indexing_data_ = indexingData;
191 }
192 
193 PartialIndexOperation::PartialIndexOperation( const QString& fileName,
194         IndexingData* indexingData, bool* interruptRequest, qint64 position )
195     : IndexOperation( fileName, indexingData, interruptRequest )
196 {
197     initialPosition_ = position;
198 }
199 
200 void IndexOperation::doIndex( IndexingData* indexing_data, qint64 initialPosition )
201 {
202     qint64 pos = initialPosition; // Absolute position of the start of current line
203     qint64 end = 0;               // Absolute position of the end of current line
204     int additional_spaces = 0;    // Additional spaces due to tabs
205 
206     QFile file( fileName_ );
207     if ( file.open( QIODevice::ReadOnly ) ) {
208         // Count the number of lines and max length
209         // (read big chunks to speed up reading from disk)
210         file.seek( pos );
211         while ( !file.atEnd() ) {
212             FastLinePositionArray line_positions;
213             int max_length = 0;
214 
215             if ( *interruptRequest_ )   // a bool is always read/written atomically isn't it?
216                 break;
217 
218             // Read a chunk of 5MB
219             const qint64 block_beginning = file.pos();
220             const QByteArray block = file.read( sizeChunk );
221 
222             // Count the number of lines in each chunk
223             qint64 pos_within_block = 0;
224             while ( pos_within_block != -1 ) {
225                 pos_within_block = qMax( pos - block_beginning, 0LL);
226                 // Looking for the next \n, expanding tabs in the process
227                 do {
228                     if ( pos_within_block < block.length() ) {
229                         const char c = block.at(pos_within_block);
230                         if ( c == '\n' )
231                             break;
232                         else if ( c == '\t' )
233                             additional_spaces += AbstractLogData::tabStop -
234                                 ( ( ( block_beginning - pos ) + pos_within_block
235                                     + additional_spaces ) % AbstractLogData::tabStop ) - 1;
236 
237                         pos_within_block++;
238                     }
239                     else {
240                         pos_within_block = -1;
241                     }
242                 } while ( pos_within_block != -1 );
243 
244                 // When a end of line has been found...
245                 if ( pos_within_block != -1 ) {
246                     end = pos_within_block + block_beginning;
247                     const int length = end-pos + additional_spaces;
248                     if ( length > max_length )
249                         max_length = length;
250                     pos = end + 1;
251                     additional_spaces = 0;
252                     line_positions.append( pos );
253                 }
254             }
255 
256             // Update the shared data
257             indexing_data->addAll( block.length(), max_length, line_positions );
258 
259             // Update the caller for progress indication
260             int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100;
261             emit indexingProgressed( progress );
262         }
263 
264         // Check if there is a non LF terminated line at the end of the file
265         qint64 file_size = file.size();
266         if ( !*interruptRequest_ && file_size > pos ) {
267             LOG( logWARNING ) <<
268                 "Non LF terminated file, adding a fake end of line";
269 
270             FastLinePositionArray line_position;
271             line_position.append( file_size + 1 );
272             line_position.setFakeFinalLF();
273 
274             indexing_data->addAll( 0, 0, line_position );
275         }
276     }
277     else {
278         // TODO: Check that the file is seekable?
279         // If the file cannot be open, we do as if it was empty
280         LOG(logWARNING) << "Cannot open file " << fileName_.toStdString();
281 
282         emit indexingProgressed( 100 );
283     }
284 }
285 
286 // Called in the worker thread's context
287 bool FullIndexOperation::start()
288 {
289     LOG(logDEBUG) << "FullIndexOperation::start(), file "
290         << fileName_.toStdString();
291 
292     LOG(logDEBUG) << "FullIndexOperation: Starting the count...";
293 
294     emit indexingProgressed( 0 );
295 
296     // First empty the index
297     indexing_data_->clear();
298 
299     doIndex( indexing_data_, 0 );
300 
301     LOG(logDEBUG) << "FullIndexOperation: ... finished counting."
302         "interrupt = " << *interruptRequest_;
303 
304     return ( *interruptRequest_ ? false : true );
305 }
306 
307 bool PartialIndexOperation::start()
308 {
309     LOG(logDEBUG) << "PartialIndexOperation::start(), file "
310         << fileName_.toStdString();
311 
312     LOG(logDEBUG) << "PartialIndexOperation: Starting the count at "
313         << initialPosition_ << " ...";
314 
315     emit indexingProgressed( 0 );
316 
317     doIndex( indexing_data_, initialPosition_ );
318 
319     LOG(logDEBUG) << "PartialIndexOperation: ... finished counting.";
320 
321     return ( *interruptRequest_ ? false : true );
322 }
323