/***************************************************************************************** * * * OpenSpace * * * * Copyright (c) 2014-2017 * * * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * * software and associated documentation files (the "Software"), to deal in the Software * * without restriction, including without limitation the rights to use, copy, modify, * * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * * permit persons to whom the Software is furnished to do so, subject to the following * * conditions: * * * * The above copyright notice and this permission notice shall be included in all copies * * or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF * * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE * * OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * ****************************************************************************************/ namespace openspace::globebrowsing { template LRUThreadPoolWorker::LRUThreadPoolWorker(LRUThreadPool& pool) : _pool(pool) {} template void LRUThreadPoolWorker::operator()() { std::function task; while (true) { // acquire lock { std::unique_lock lock(_pool._queueMutex); // look for a work item while (!_pool._stop && _pool._queuedTasks.isEmpty()) { // if there are none wait for notification _pool._condition.wait(lock); } if (_pool._stop) { // exit if the pool is stopped return; } // get the task from the queue task = _pool._queuedTasks.popMRU().second; }// release lock // execute the task task(); } } template LRUThreadPool::LRUThreadPool(size_t numThreads, size_t queueSize) : _queuedTasks(queueSize) , _stop(false) { for (size_t i = 0; i < numThreads; ++i) { _workers.push_back(std::thread(LRUThreadPoolWorker(*this))); } } template LRUThreadPool::LRUThreadPool(const LRUThreadPool& toCopy) : LRUThreadPool(toCopy._workers.size(), toCopy._queuedTasks.maximumCacheSize()) { } // the destructor joins all threads template LRUThreadPool::~LRUThreadPool() { { std::unique_lock lock(_queueMutex); _stop = true; } _condition.notify_all(); // join them for (size_t i = 0; i < _workers.size(); ++i) { _workers[i].join(); } } // add new work item to the pool template void LRUThreadPool::enqueue(std::function f, KeyType key) { { // acquire lock std::unique_lock lock(_queueMutex); // add the task //_queuedTasks.put(key, f); std::vector>> unfinishedTasks = _queuedTasks.putAndFetchPopped(key, f); for (auto unfinishedTask : unfinishedTasks) { _unqueuedTasks.push_back(unfinishedTask.first); } } // release lock // wake up one thread _condition.notify_one(); } template bool LRUThreadPool::touch(KeyType key) { std::unique_lock lock(_queueMutex); return _queuedTasks.touch(key); } template std::vector LRUThreadPool::getUnqueuedTasksKeys() { std::vector toReturn = _unqueuedTasks; { std::unique_lock lock(_queueMutex); _unqueuedTasks.clear(); } return toReturn; } template std::vector LRUThreadPool::getQueuedTasksKeys() { std::vector queuedTasks; { std::unique_lock lock(_queueMutex); while (!_queuedTasks.isEmpty()) { queuedTasks.push_back(_queuedTasks.popMRU().first); } } return queuedTasks; } template void LRUThreadPool::clearEnqueuedTasks() { { // acquire lock std::unique_lock lock(_queueMutex); _queuedTasks.clear(); } // release lock } } // namespace openspace::globebrowsing