1 #ifndef WATCHTOWER_H 2 #define WATCHTOWER_H 3 4 #include "config.h" 5 6 #include <memory> 7 #include <atomic> 8 #include <thread> 9 #include <mutex> 10 11 #include "watchtowerlist.h" 12 13 // Allow the client to register for notification on an arbitrary number 14 // of files. It will be notified to any change (creation/deletion/modification) 15 // on those files. 16 // It is passed a platform specific driver. 17 18 // FIXME: Where to put that so it is not dependant 19 // Registration object to implement RAII 20 #ifdef HAS_TEMPLATE_ALIASES 21 using Registration = std::shared_ptr<void>; 22 #else 23 typedef std::shared_ptr<void> Registration; 24 #endif 25 26 template<typename Driver> 27 class WatchTower { 28 public: 29 // Create an empty watchtower 30 WatchTower(); 31 // Destroy the object 32 ~WatchTower(); 33 34 // Set the polling interval (in ms) 35 // 0 disables polling and is the default 36 void setPollingInterval( int interval_ms ); 37 38 // Add a file to the notification list. notification will be called when 39 // the file is modified, moved or deleted. 40 // Lifetime of the notification is tied to the Registration object returned. 41 // Note the notification function is called with a mutex held and in a 42 // third party thread, beware of races! 43 Registration addFile( const std::string& file_name, 44 std::function<void()> notification ); 45 46 // Number of watched directories (for tests) 47 unsigned int numberWatchedDirectories() const; 48 49 private: 50 // The driver (parametrised) 51 Driver driver_; 52 53 // List of files/dirs observed 54 ObservedFileList<Driver> observed_file_list_; 55 56 // Protects the observed_file_list_ 57 std::mutex observers_mutex_; 58 59 // Polling interval (0 disables polling) 60 uint32_t polling_interval_ms_ = 0; 61 62 // Thread 63 std::atomic_bool running_; 64 std::thread thread_; 65 66 // Exist as long as the onject exists, to ensure observers won't try to 67 // call us if we are dead. 68 std::shared_ptr<void> heartBeat_; 69 70 // Private member functions 71 std::tuple<typename Driver::FileId, typename Driver::SymlinkId> 72 addFileToDriver( const std::string& ); 73 static void removeNotification( WatchTower* watch_tower, 74 std::shared_ptr<void> notification ); 75 void run(); 76 }; 77 78 // Class template implementation 79 80 #include <algorithm> 81 82 #include <sys/types.h> 83 #include <sys/stat.h> 84 #include <unistd.h> 85 86 #include "log.h" 87 88 namespace { 89 bool isSymLink( const std::string& file_name ); 90 std::string directory_path( const std::string& path ); 91 }; 92 93 template <typename Driver> 94 WatchTower<Driver>::WatchTower() 95 : driver_(), thread_(), 96 heartBeat_(std::shared_ptr<void>((void*) 0xDEADC0DE, [] (void*) {})) 97 { 98 running_ = true; 99 thread_ = std::thread( &WatchTower::run, this ); 100 } 101 102 template <typename Driver> 103 WatchTower<Driver>::~WatchTower() 104 { 105 running_ = false; 106 driver_.interruptWait(); 107 thread_.join(); 108 } 109 110 template <typename Driver> 111 void WatchTower<Driver>::setPollingInterval( int interval_ms ) 112 { 113 polling_interval_ms_ = interval_ms; 114 } 115 116 template <typename Driver> 117 Registration WatchTower<Driver>::addFile( 118 const std::string& file_name, 119 std::function<void()> notification ) 120 { 121 // LOG(logDEBUG) << "WatchTower::addFile " << file_name; 122 123 std::weak_ptr<void> weakHeartBeat(heartBeat_); 124 125 std::lock_guard<std::mutex> lock( observers_mutex_ ); 126 127 auto existing_observed_file = 128 observed_file_list_.searchByName( file_name ); 129 130 std::shared_ptr<std::function<void()>> ptr( new std::function<void()>(std::move( notification )) ); 131 132 if ( ! existing_observed_file ) 133 { 134 typename Driver::FileId file_id; 135 typename Driver::SymlinkId symlink_id; 136 137 std::tie( file_id, symlink_id ) = addFileToDriver( file_name ); 138 auto new_file = observed_file_list_.addNewObservedFile( 139 ObservedFile<Driver>( file_name, ptr, file_id, symlink_id ) ); 140 141 auto dir = observed_file_list_.watchedDirectoryForFile( file_name ); 142 if ( ! dir ) { 143 LOG(logDEBUG) << "WatchTower::addFile dir for " << file_name 144 << " not watched, adding..."; 145 146 dir = observed_file_list_.addWatchedDirectoryForFile( file_name, 147 [this, weakHeartBeat] (ObservedDir<Driver>* dir) { 148 if ( auto heart_beat = weakHeartBeat.lock() ) { 149 driver_.removeDir( dir->dir_id_ ); 150 } } ); 151 152 dir->dir_id_ = driver_.addDir( dir->path ); 153 154 if ( ! dir->dir_id_.valid() ) { 155 LOG(logWARNING) << "WatchTower::addFile driver failed to add dir"; 156 dir = nullptr; 157 } 158 } 159 else { 160 LOG(logDEBUG) << "WatchTower::addFile Found exisiting watch for dir " << file_name; 161 } 162 163 // Associate the dir to the file 164 if ( dir ) 165 new_file->dir_ = dir; 166 } 167 else 168 { 169 existing_observed_file->addCallback( ptr ); 170 } 171 172 // Returns a shared pointer that removes its own entry 173 // from the list of watched stuff when it goes out of scope! 174 // Uses a custom deleter to do the work. 175 return std::shared_ptr<void>( 0x0, [this, ptr, weakHeartBeat] (void*) { 176 if ( auto heart_beat = weakHeartBeat.lock() ) 177 WatchTower<Driver>::removeNotification( this, ptr ); 178 } ); 179 } 180 181 template <typename Driver> 182 unsigned int WatchTower<Driver>::numberWatchedDirectories() const 183 { 184 return observed_file_list_.numberWatchedDirectories(); 185 } 186 187 // 188 // Private functions 189 // 190 191 // Add the passed file name to the driver, returning the file and symlink id 192 template <typename Driver> 193 std::tuple<typename Driver::FileId, typename Driver::SymlinkId> 194 WatchTower<Driver>::addFileToDriver( const std::string& file_name ) 195 { 196 typename Driver::SymlinkId symlink_id; 197 auto file_id = driver_.addFile( file_name ); 198 199 if ( isSymLink( file_name ) ) 200 { 201 // We want to follow the name (as opposed to the inode) 202 // so we watch the symlink as well. 203 symlink_id = driver_.addSymlink( file_name ); 204 } 205 206 return std::make_tuple( file_id, symlink_id ); 207 } 208 209 // Called by the dtor for a registration object 210 template <typename Driver> 211 void WatchTower<Driver>::removeNotification( 212 WatchTower* watch_tower, std::shared_ptr<void> notification ) 213 { 214 LOG(logDEBUG) << "WatchTower::removeNotification"; 215 216 std::lock_guard<std::mutex> lock( watch_tower->observers_mutex_ ); 217 218 auto file = 219 watch_tower->observed_file_list_.removeCallback( notification ); 220 221 if ( file ) 222 { 223 watch_tower->driver_.removeFile( file->file_id_ ); 224 watch_tower->driver_.removeSymlink( file->symlink_id_ ); 225 } 226 } 227 228 // Run in its own thread 229 template <typename Driver> 230 void WatchTower<Driver>::run() 231 { 232 while ( running_ ) { 233 std::unique_lock<std::mutex> lock( observers_mutex_ ); 234 235 std::vector<ObservedFile<Driver>*> files_needing_readding; 236 237 auto files = driver_.waitAndProcessEvents( 238 &observed_file_list_, &lock, &files_needing_readding, polling_interval_ms_ ); 239 LOG(logDEBUG) << "WatchTower::run: waitAndProcessEvents returned " 240 << files.size() << " files, " << files_needing_readding.size() 241 << " needing re-adding"; 242 243 for ( auto file: files_needing_readding ) { 244 // A file 'needing readding' has the same name, 245 // but probably a different inode, so it needs 246 // to be readded for some drivers that rely on the 247 // inode (e.g. inotify) 248 driver_.removeFile( file->file_id_ ); 249 driver_.removeSymlink( file->symlink_id_ ); 250 251 std::tie( file->file_id_, file->symlink_id_ ) = 252 addFileToDriver( file->file_name_ ); 253 } 254 255 for ( auto file: files ) { 256 for ( auto observer: file->callbacks ) { 257 LOG(logDEBUG) << "WatchTower::run: notifying the client!"; 258 // Here we have to cast our generic pointer back to 259 // the function pointer in order to perform the call 260 const std::shared_ptr<std::function<void()>> fptr = 261 std::static_pointer_cast<std::function<void()>>( observer ); 262 // The observer is called with the mutex held, 263 // Let's hope it doesn't do anything too funky. 264 (*fptr)(); 265 266 file->markAsChanged(); 267 } 268 } 269 270 if ( polling_interval_ms_ > 0 ) { 271 // Also call files that have not been called for a while 272 for ( auto file: observed_file_list_ ) { 273 uint32_t ms_since_last_check = 274 std::chrono::duration_cast<std::chrono::milliseconds>( 275 std::chrono::steady_clock::now() - file->timeForLastCheck() ).count(); 276 if ( ( ms_since_last_check > polling_interval_ms_ ) && file->hasChanged() ) { 277 LOG(logDEBUG) << "WatchTower::run: " << file->file_name_; 278 for ( auto observer: file->callbacks ) { 279 LOG(logDEBUG) << "WatchTower::run: notifying the client because of a timeout!"; 280 // Here we have to cast our generic pointer back to 281 // the function pointer in order to perform the call 282 const std::shared_ptr<std::function<void()>> fptr = 283 std::static_pointer_cast<std::function<void()>>( observer ); 284 // The observer is called with the mutex held, 285 // Let's hope it doesn't do anything too funky. 286 (*fptr)(); 287 } 288 file->markAsChanged(); 289 } 290 } 291 } 292 } 293 } 294 295 namespace { 296 bool isSymLink( const std::string& file_name ) 297 { 298 #ifdef HAVE_SYMLINK 299 struct stat buf; 300 301 lstat( file_name.c_str(), &buf ); 302 return ( S_ISLNK(buf.st_mode) ); 303 #else 304 return false; 305 #endif 306 } 307 }; 308 309 #endif 310