diff --git a/include/sqlpp11/connection_pool.h b/include/sqlpp11/connection_pool.h index 92465dfa..5d5181da 100644 --- a/include/sqlpp11/connection_pool.h +++ b/include/sqlpp11/connection_pool.h @@ -29,55 +29,93 @@ #include #include +#include #include #include +#include +#include #include +#include namespace sqlpp { - template + namespace reconnect_policy + { + struct auto_reconnect { + template + void operator()(Connection* connection) + { + if(!connection->is_valid()) + connection->reconnect() + } + template + void clean(Connection* connection) {} + }; + + using namespace std::chrono_literals; + class periodic_reconnect + { + private: + std::chrono::seconds revalidate_after; + std::unordered_map > last_checked; + + public: + periodic_reconnect(const std::chrono::seconds r = 28800s) //default wait_timeout in MySQL + : revalidate_after(r), last_checked() {} + + template + void operator()(Connection* con) + { + auto last = last_checked.find(con); + auto now = std::chrono::system_clock::now(); + if(last == last_checked.end()) + { + if (!con->is_valid()) + { + con->reconnect(); + } + last_checked.emplace_hint(last, con, now); + } + else if(now - last->second > revalidate_after) + { + if (!con->is_valid()) + { + con->reconnect(); + } + last = now; + } + } + template + void clean(Connection* con) { + auto itr = last_checked.find(con); + if(itr != last_checked.end()) + { + last_checked.erase(itr); + } + } + }; + + struct never_reconnect { + template + void operator()(Connection*) {} + template + void clean(Connection*) {} + }; + } + + template ::value, Connection_config::connection>::type, + typename Reconnect_policy = reconnect_policy::auto_reconnect> class connection_pool { + friend pool_connection; + private: std::mutex connection_pool_mutex; const std::shared_ptr config; - unsigned int maximum_pool_size = 0; + size_t maximum_pool_size = 0; std::stack> free_connections; - - struct async_connection - { - friend class connection_pool; - - private: - std::unique_ptr _impl; - connection_pool* origin; - - public: - async_connection(std::unique_ptr& connection, connection_pool* origin) : - _impl(std::move(connection)), origin(origin) {} - - ~async_connection() - { - origin->free_connection(_impl); - } - - template - auto operator()(Args&&... args) -> decltype(_impl->args(std::forward(args)...)) - { - return _impl->args(std::forward(args)...); - } - - Connection* operator->() - { - return &_impl; - } - - async_connection(const async_connection&) = delete; - async_connection(async_connection&& other) - : _impl(std::move(other._impl)), origin(other.origin) {} - async_connection& operator=(const async_connection&) = delete; - async_connection& operator=(async_connection&&) = delete; - }; + Reconnect_policy reconnect_policy; void free_connection(std::unique_ptr& connection) { @@ -107,28 +145,30 @@ namespace sqlpp } public: - connection_pool(const std::shared_ptr& config, unsigned int pool_size) - : config(config), maximum_pool_size(pool_size) - {} + connection_pool(const std::shared_ptr& config, size_t pool_size, + Reconnect_policy reconnect_policy = sqlpp::reconnect_policy::auto_reconnect()) + : config(config), maximum_pool_size(pool_size), reconnect_policy(reconnect_policy) {} ~connection_pool() = default; connection_pool(const connection_pool&) = delete; - connection_pool(connection_pool&&) = delete; + connection_pool(connection_pool&& other) + : config(std::move(other.config)), maximum_pool_size(std::move(other.maximum_pool_size)), + reconnect_policy(std::move(other.reconnect_policy)) {} connection_pool& operator=(const connection_pool&) = delete; connection_pool& operator=(connection_pool&&) = delete; - async_connection get_connection() + pool_connection get_connection() { std::lock_guard lock(connection_pool_mutex); if (!free_connections.empty()) { auto connection = std::move(free_connections.top()); free_connections.pop(); - return async_connection(connection, this); + return pool_connection(connection, this); } try { - return async_connection(std::move(std::make_unique(config)), this); + return pool_connection(std::move(std::make_unique(config)), this); } catch (const sqlpp::exception& e) { @@ -138,6 +178,18 @@ namespace sqlpp } } }; + + template::value, Connection_config::connection>::type, + typename Reconnect_policy = reconnect_policy::auto_reconnect> + connection_pool make_connection_pool( + const std::shared_ptr& config, + size_t max_pool_size, + Reconnect_policy reconnect_policy = reconnect_policy::auto_reconnect() + ) + { + return connection_pool(config, max_pool_size, reconnect_policy); + } } #endif diff --git a/include/sqlpp11/pool_connection.h b/include/sqlpp11/pool_connection.h new file mode 100644 index 00000000..426e0c83 --- /dev/null +++ b/include/sqlpp11/pool_connection.h @@ -0,0 +1,76 @@ +/* +* Copyright (c) 2013 - 2017, Roland Bock, Frank Park +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without modification, +* are permitted provided that the following conditions are met: +* +* Redistributions of source code must retain the above copyright notice, this +* list of conditions and the following disclaimer. +* +* Redistributions in binary form must reproduce the above copyright notice, this +* list of conditions and the following disclaimer in the documentation and/or +* other materials provided with the distribution. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef SQLPP_POOL_CONNECTION_H +#define SQLPP_POOL_CONNECTION_H + +#include + +namespace sqlpp +{ + template > + struct pool_connection + { + private: + std::unique_ptr _impl; + Connection_pool* origin; + + public: + pool_connection(std::unique_ptr& connection, Connection_pool* origin) + : _impl(std::move(connection)), origin(origin) {} + + ~pool_connection() + { + origin->free_connection(_impl); + } + + template + auto operator()(Args&&... args) -> decltype(_impl->args(std::forward(args)...)) + { + return _impl->args(std::forward(args)...); + } + + template + auto operator()(const T& t) -> decltype(_impl->run(t)) + { + return _impl->run(t); + } + + Connection* operator->() + { + return &_impl; + } + + pool_connection(const pool_connection&) = delete; + pool_connection(pool_connection&& other) + : _impl(std::move(other._impl)), origin(other.origin) {} + pool_connection& operator=(const pool_connection&) = delete; + pool_connection& operator=(pool_connection&&) = delete; + }; +} + +#endif