Added a connection pool (#11)

L
This commit is contained in:
Dr. Patrick Urbanke (劉自成)
2025-05-25 10:42:40 +02:00
committed by GitHub
parent 622c44efbb
commit 80f5e84a42
7 changed files with 510 additions and 2 deletions
+2
View File
@@ -1,6 +1,7 @@
#ifndef SQLGEN_HPP_
#define SQLGEN_HPP_
#include "sqlgen/ConnectionPool.hpp"
#include "sqlgen/Flatten.hpp"
#include "sqlgen/Iterator.hpp"
#include "sqlgen/IteratorBase.hpp"
@@ -10,6 +11,7 @@
#include "sqlgen/Range.hpp"
#include "sqlgen/Ref.hpp"
#include "sqlgen/Result.hpp"
#include "sqlgen/Session.hpp"
#include "sqlgen/Timestamp.hpp"
#include "sqlgen/Varchar.hpp"
#include "sqlgen/begin_transaction.hpp"
+108
View File
@@ -0,0 +1,108 @@
#ifndef SQLGEN_CONNECTIONPOOL_HPP_
#define SQLGEN_CONNECTIONPOOL_HPP_
#include <atomic>
#include <chrono>
#include <memory>
#include <numeric>
#include <thread>
#include <utility>
#include <vector>
#include "Ref.hpp"
#include "Result.hpp"
#include "Session.hpp"
namespace sqlgen {
struct ConnectionPoolConfig {
size_t size = 4;
size_t num_attempts = 10;
size_t wait_time_in_seconds = 1;
};
template <class Connection>
class ConnectionPool {
using ConnPtr = Ref<Connection>;
public:
template <class... Args>
ConnectionPool(const ConnectionPoolConfig& _config, const Args&... _args) {
conns_->reserve(_config.size);
for (size_t i = 0; i < _config.size; ++i) {
auto conn = Ref<Connection>::make(_args...);
auto flag = Ref<std::atomic_flag>::make();
flag->clear();
conns_->emplace_back(std::make_pair(std::move(conn), std::move(flag)));
}
}
template <class... Args>
static Result<ConnectionPool> make(const ConnectionPoolConfig& _config,
const Args&... _args) noexcept {
try {
return ConnectionPool(_config, _args...);
} catch (std::exception& e) {
return error(e.what());
}
}
~ConnectionPool() = default;
/// Acquire a session from the pool. Returns an error if no connections are
/// available after several attempts.
Result<Ref<Session<Connection>>> acquire() noexcept {
for (size_t att = 0; att < config_.num_attempts; ++att) {
if (att != 0) {
std::this_thread::sleep_for(
std::chrono::seconds(config_.wait_time_in_seconds));
}
for (auto& [conn, flag] : *conns_) {
if (!flag->test_and_set()) {
return Ref<Session<Connection>>::make(conn, flag);
}
}
}
return error("No available connections in the pool.");
}
/// Get the current number of available connections
size_t available() const {
return std::accumulate(conns_->begin(), conns_->end(), 0,
[](const auto _count, const auto& _p) {
return _p.second->test() ? _count : _count + 1;
});
}
/// Get the total number of connections in the pool
size_t size() const { return conns_->size(); }
private:
/// The configuration for the connection pool.
ConnectionPoolConfig config_;
/// The underlying connection objects.
Ref<std::vector<std::pair<ConnPtr, Ref<std::atomic_flag>>>> conns_;
};
template <class Connection, class... Args>
Result<ConnectionPool<Connection>> make_connection_pool(
const ConnectionPoolConfig& _config, const Args&... _args) noexcept {
return ConnectionPool<Connection>::make(_config, _args...);
}
template <class Connection>
Result<Ref<Session<Connection>>> session(
ConnectionPool<Connection> _pool) noexcept {
return _pool.acquire();
}
template <class Connection>
Result<Ref<Session<Connection>>> session(
Result<ConnectionPool<Connection>> _res) noexcept {
return _res.and_then([](auto _pool) { return session(_pool); });
}
} // namespace sqlgen
#endif
+97
View File
@@ -0,0 +1,97 @@
#ifndef SQLGEN_SESSION_HPP_
#define SQLGEN_SESSION_HPP_
#include <atomic>
#include <memory>
#include <optional>
#include <vector>
#include "IteratorBase.hpp"
#include "Ref.hpp"
#include "dynamic/Insert.hpp"
#include "dynamic/SelectFrom.hpp"
#include "dynamic/Statement.hpp"
#include "dynamic/Write.hpp"
namespace sqlgen {
template <class Connection>
class Session {
public:
using ConnPtr = Ref<Connection>;
Session(const Ref<Connection>& _conn, const Ref<std::atomic_flag>& _flag)
: conn_(_conn), flag_(_flag.ptr()) {}
Session(const Session<Connection>& _other) = delete;
Session(Session<Connection>&& _other)
: conn_(std::move(_other.conn_)), flag_(_other.flag_) {
_other.flag_.reset();
}
~Session() {
if (flag_) {
flag_->clear();
}
}
Result<Nothing> begin_transaction() { return conn_->begin_transaction(); }
Result<Nothing> commit() { return conn_->commit(); }
Result<Nothing> execute(const std::string& _sql) {
return conn_->execute(_sql);
}
Result<Nothing> insert(
const dynamic::Insert& _stmt,
const std::vector<std::vector<std::optional<std::string>>>& _data) {
return conn_->insert(_stmt, _data);
}
Session& operator=(const Session& _other) = delete;
Session& operator=(Session&& _other) noexcept {
if (this == &_other) {
return *this;
}
conn_ = std::move(_other.conn_);
flag_ = _other.flag_;
_other.flag_.reset();
return *this;
}
Result<Ref<IteratorBase>> read(const dynamic::SelectFrom& _query) {
return conn_->read(_query);
}
Result<Nothing> rollback() noexcept { return conn_->rollback(); }
std::string to_sql(const dynamic::Statement& _stmt) noexcept {
return conn_->to_sql(_stmt);
}
Result<Nothing> start_write(const dynamic::Write& _stmt) {
return conn_->start_write(_stmt);
}
Result<Nothing> end_write() { return conn_->end_write(); }
Result<Nothing> write(
const std::vector<std::vector<std::optional<std::string>>>& _data) {
return conn_->write(_data);
}
private:
/// The underlying connection object.
ConnPtr conn_;
/// The flag corresponding to the object - as long as this is true, we have
/// ownership.
std::shared_ptr<std::atomic_flag> flag_;
};
} // namespace sqlgen
#endif
+2 -2
View File
@@ -67,7 +67,7 @@ class Transaction {
if (this == &_other) {
return *this;
}
conn_ = _other.conn;
conn_ = _other.conn_;
transaction_ended_ = _other.transaction_ended_;
_other.transaction_ended_ = true;
return *this;
@@ -92,7 +92,7 @@ class Transaction {
}
Result<Nothing> start_write(const dynamic::Write& _stmt) {
return conn_->to_sql(_stmt);
return conn_->start_write(_stmt);
}
Result<Nothing> end_write() { return conn_->end_write(); }