xref: /glogg/src/watchtower.h (revision fcaa75578d41fbd1a1b35740bd5dc8176496a14b)
1 #ifndef WATCHTOWER_H
2 #define WATCHTOWER_H
3 
4 #include "config.h"
5 
6 #include <memory>
7 #include <atomic>
8 #include <thread>
9 #include <mutex>
10 
11 #include "watchtowerlist.h"
12 
13 // Allow the client to register for notification on an arbitrary number
14 // of files. It will be notified to any change (creation/deletion/modification)
15 // on those files.
16 // It is passed a platform specific driver.
17 
18 // FIXME: Where to put that so it is not dependant
19 // Registration object to implement RAII
20 #ifdef HAS_TEMPLATE_ALIASES
21 using Registration = std::shared_ptr<void>;
22 #else
23 typedef std::shared_ptr<void> Registration;
24 #endif
25 
26 template<typename Driver>
27 class WatchTower {
28   public:
29     // Create an empty watchtower
30     WatchTower();
31     // Destroy the object
32     ~WatchTower();
33 
34     // Set the polling interval (in ms)
35     // 0 disables polling and is the default
36     void setPollingInterval( int interval_ms );
37 
38     // Add a file to the notification list. notification will be called when
39     // the file is modified, moved or deleted.
40     // Lifetime of the notification is tied to the Registration object returned.
41     // Note the notification function is called with a mutex held and in a
42     // third party thread, beware of races!
43     Registration addFile( const std::string& file_name,
44             std::function<void()> notification );
45 
46     // Number of watched directories (for tests)
47     unsigned int numberWatchedDirectories() const;
48 
49   private:
50     // The driver (parametrised)
51     Driver driver_;
52 
53     // List of files/dirs observed
54     ObservedFileList<Driver> observed_file_list_;
55 
56     // Protects the observed_file_list_
57     std::mutex observers_mutex_;
58 
59     // Polling interval (0 disables polling)
60     uint32_t polling_interval_ms_ = 0;
61 
62     // Thread
63     std::atomic_bool running_;
64     std::thread thread_;
65 
66     // Exist as long as the onject exists, to ensure observers won't try to
67     // call us if we are dead.
68     std::shared_ptr<void> heartBeat_;
69 
70     // Private member functions
71     std::tuple<typename Driver::FileId, typename Driver::SymlinkId>
72         addFileToDriver( const std::string& );
73     static void removeNotification( WatchTower* watch_tower,
74             std::shared_ptr<void> notification );
75     void run();
76 };
77 
78 // Class template implementation
79 
80 #include <algorithm>
81 
82 #include <sys/types.h>
83 #include <sys/stat.h>
84 #include <unistd.h>
85 
86 #include "log.h"
87 
88 namespace {
89     bool isSymLink( const std::string& file_name );
90     std::string directory_path( const std::string& path );
91 };
92 
93 template <typename Driver>
94 WatchTower<Driver>::WatchTower()
95     : driver_(), thread_(),
96     heartBeat_(std::shared_ptr<void>((void*) 0xDEADC0DE, [] (void*) {}))
97 {
98     running_ = true;
99     thread_ = std::thread( &WatchTower::run, this );
100 }
101 
102 template <typename Driver>
103 WatchTower<Driver>::~WatchTower()
104 {
105     running_ = false;
106     driver_.interruptWait();
107     thread_.join();
108 }
109 
110 template <typename Driver>
111 void WatchTower<Driver>::setPollingInterval( int interval_ms )
112 {
113     polling_interval_ms_ = interval_ms;
114 }
115 
116 template <typename Driver>
117 Registration WatchTower<Driver>::addFile(
118         const std::string& file_name,
119         std::function<void()> notification )
120 {
121     // LOG(logDEBUG) << "WatchTower::addFile " << file_name;
122 
123     std::weak_ptr<void> weakHeartBeat(heartBeat_);
124 
125     std::lock_guard<std::mutex> lock( observers_mutex_ );
126 
127     auto existing_observed_file =
128         observed_file_list_.searchByName( file_name );
129 
130     std::shared_ptr<std::function<void()>> ptr( new std::function<void()>(std::move( notification )) );
131 
132     if ( ! existing_observed_file )
133     {
134         typename Driver::FileId file_id;
135         typename Driver::SymlinkId symlink_id;
136 
137         std::tie( file_id, symlink_id ) = addFileToDriver( file_name );
138         auto new_file = observed_file_list_.addNewObservedFile(
139                 ObservedFile<Driver>( file_name, ptr, file_id, symlink_id ) );
140 
141         auto dir = observed_file_list_.watchedDirectoryForFile( file_name );
142         if ( ! dir ) {
143             LOG(logDEBUG) << "WatchTower::addFile dir for " << file_name
144                 << " not watched, adding...";
145 
146             dir = observed_file_list_.addWatchedDirectoryForFile( file_name,
147                     [this, weakHeartBeat] (ObservedDir<Driver>* dir) {
148                         if ( auto heart_beat = weakHeartBeat.lock() ) {
149                             driver_.removeDir( dir->dir_id_ );
150                         } } );
151 
152             dir->dir_id_ = driver_.addDir( dir->path );
153 
154             if ( ! dir->dir_id_.valid() ) {
155                 LOG(logWARNING) << "WatchTower::addFile driver failed to add dir";
156                 dir = nullptr;
157             }
158         }
159         else {
160             LOG(logDEBUG) << "WatchTower::addFile Found exisiting watch for dir " << file_name;
161         }
162 
163         // Associate the dir to the file
164         if ( dir )
165             new_file->dir_ = dir;
166     }
167     else
168     {
169         existing_observed_file->addCallback( ptr );
170     }
171 
172     // Returns a shared pointer that removes its own entry
173     // from the list of watched stuff when it goes out of scope!
174     // Uses a custom deleter to do the work.
175     return std::shared_ptr<void>( 0x0, [this, ptr, weakHeartBeat] (void*) {
176             if ( auto heart_beat = weakHeartBeat.lock() )
177                 WatchTower<Driver>::removeNotification( this, ptr );
178             } );
179 }
180 
181 template <typename Driver>
182 unsigned int WatchTower<Driver>::numberWatchedDirectories() const
183 {
184     return observed_file_list_.numberWatchedDirectories();
185 }
186 
187 //
188 // Private functions
189 //
190 
191 // Add the passed file name to the driver, returning the file and symlink id
192 template <typename Driver>
193 std::tuple<typename Driver::FileId, typename Driver::SymlinkId>
194 WatchTower<Driver>::addFileToDriver( const std::string& file_name )
195 {
196     typename Driver::SymlinkId symlink_id;
197     auto file_id = driver_.addFile( file_name );
198 
199     if ( isSymLink( file_name ) )
200     {
201         // We want to follow the name (as opposed to the inode)
202         // so we watch the symlink as well.
203         symlink_id = driver_.addSymlink( file_name );
204     }
205 
206     return std::make_tuple( file_id, symlink_id );
207 }
208 
209 // Called by the dtor for a registration object
210 template <typename Driver>
211 void WatchTower<Driver>::removeNotification(
212         WatchTower* watch_tower, std::shared_ptr<void> notification )
213 {
214     LOG(logDEBUG) << "WatchTower::removeNotification";
215 
216     std::lock_guard<std::mutex> lock( watch_tower->observers_mutex_ );
217 
218     auto file =
219         watch_tower->observed_file_list_.removeCallback( notification );
220 
221     if ( file )
222     {
223         watch_tower->driver_.removeFile( file->file_id_ );
224         watch_tower->driver_.removeSymlink( file->symlink_id_ );
225     }
226 }
227 
228 // Run in its own thread
229 template <typename Driver>
230 void WatchTower<Driver>::run()
231 {
232     while ( running_ ) {
233         std::unique_lock<std::mutex> lock( observers_mutex_ );
234 
235         std::vector<ObservedFile<Driver>*> files_needing_readding;
236 
237         auto files = driver_.waitAndProcessEvents(
238                 &observed_file_list_, &lock, &files_needing_readding, polling_interval_ms_ );
239         LOG(logDEBUG) << "WatchTower::run: waitAndProcessEvents returned "
240             << files.size() << " files, " << files_needing_readding.size()
241             << " needing re-adding";
242 
243         for ( auto file: files_needing_readding ) {
244             // A file 'needing readding' has the same name,
245             // but probably a different inode, so it needs
246             // to be readded for some drivers that rely on the
247             // inode (e.g. inotify)
248             driver_.removeFile( file->file_id_ );
249             driver_.removeSymlink( file->symlink_id_ );
250 
251             std::tie( file->file_id_, file->symlink_id_ ) =
252                 addFileToDriver( file->file_name_ );
253         }
254 
255         for ( auto file: files ) {
256             for ( auto observer: file->callbacks ) {
257                 LOG(logDEBUG) << "WatchTower::run: notifying the client!";
258                 // Here we have to cast our generic pointer back to
259                 // the function pointer in order to perform the call
260                 const std::shared_ptr<std::function<void()>> fptr =
261                     std::static_pointer_cast<std::function<void()>>( observer );
262                 // The observer is called with the mutex held,
263                 // Let's hope it doesn't do anything too funky.
264                 (*fptr)();
265 
266                 file->markAsChanged();
267             }
268         }
269 
270         if ( polling_interval_ms_ > 0 ) {
271             // Also call files that have not been called for a while
272             for ( auto file: observed_file_list_ ) {
273                 uint32_t ms_since_last_check =
274                     std::chrono::duration_cast<std::chrono::milliseconds>(
275                             std::chrono::steady_clock::now() - file->timeForLastCheck() ).count();
276                 if ( ( ms_since_last_check > polling_interval_ms_ ) && file->hasChanged() ) {
277                     LOG(logDEBUG) << "WatchTower::run: " << file->file_name_;
278                     for ( auto observer: file->callbacks ) {
279                         LOG(logDEBUG) << "WatchTower::run: notifying the client because of a timeout!";
280                         // Here we have to cast our generic pointer back to
281                         // the function pointer in order to perform the call
282                         const std::shared_ptr<std::function<void()>> fptr =
283                             std::static_pointer_cast<std::function<void()>>( observer );
284                         // The observer is called with the mutex held,
285                         // Let's hope it doesn't do anything too funky.
286                         (*fptr)();
287                     }
288                     file->markAsChanged();
289                 }
290             }
291         }
292     }
293 }
294 
295 namespace {
296     bool isSymLink( const std::string& file_name )
297     {
298 #ifdef HAVE_SYMLINK
299         struct stat buf;
300 
301         lstat( file_name.c_str(), &buf );
302         return ( S_ISLNK(buf.st_mode) );
303 #else
304         return false;
305 #endif
306     }
307 };
308 
309 #endif
310