shard: distributed log implementation and shard can use it with a flag set

This commit is contained in:
Miroslav Crnic
2024-03-12 11:02:04 +00:00
committed by GitHub Enterprise
parent d5fb66b694
commit b240de53b5
32 changed files with 4697 additions and 145 deletions

View File

@@ -1,4 +1,5 @@
#include <chrono>
#include <cstring>
#include <fstream>
#include <memory>
#include <mutex>
@@ -21,6 +22,7 @@
#include "Env.hpp"
#include "Exception.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "Shard.hpp"
#include "Time.hpp"
#include "CDCDB.hpp"
@@ -38,6 +40,8 @@
struct CDCShared {
CDCDB& db;
std::array<std::atomic<uint16_t>, 2> ownPorts;
std::mutex replicasLock;
std::array<AddrsInfo, 5> replicas;
std::mutex shardsMutex;
std::array<ShardInfo, 256> shards;
// How long it took us to process the entire request, from parse to response.
@@ -826,39 +830,77 @@ public:
struct CDCRegisterer : PeriodicLoop {
CDCShared& _shared;
uint32_t _ownIp1;
uint32_t _ownIp2;
std::string _shuckleHost;
uint16_t _shucklePort;
bool _hasSecondIp;
XmonNCAlert _alert;
ReplicaId _replicaId;
ReplicaId _leaderReplicaId;
AddrsInfo _info;
bool _infoLoaded;
bool _registerCompleted;
public:
CDCRegisterer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared):
PeriodicLoop(logger, xmon, "registerer", { 1_sec, 1_mins }),
_shared(shared),
_ownIp1(options.ipPorts[0].ip),
_ownIp2(options.ipPorts[1].ip),
_shuckleHost(options.shuckleHost),
_shucklePort(options.shucklePort),
_hasSecondIp(options.ipPorts[1].ip != 0),
_alert(10_sec)
{}
_alert(10_sec),
_replicaId(options.replicaId),
_leaderReplicaId(options.leaderReplicaId),
_infoLoaded(false),
_registerCompleted(false)
{
uint32_t ip1 = options.ipPorts[0].ip;
uint32_t ip2 = options.ipPorts[1].ip;
uint32_t ip = htonl(ip1);
memcpy(_info.ip1.data.data(), &ip, 4);
ip = htonl(ip2);
memcpy(_info.ip2.data.data(), &ip, 4);
}
virtual ~CDCRegisterer() = default;
virtual bool periodicStep() override {
uint16_t port1 = _shared.ownPorts[0].load();
uint16_t port2 = _shared.ownPorts[1].load();
if (port1 == 0 || (_hasSecondIp && port2 == 0)) {
return false;
if (unlikely(!_infoLoaded)) {
uint16_t port1 = _shared.ownPorts[0].load();
uint16_t port2 = _shared.ownPorts[1].load();
if (port1 == 0 || (_hasSecondIp && port2 == 0)) {
return false;
}
_info.port1 = port1;
_info.port2 = port2;
_infoLoaded = true;
}
LOG_DEBUG(_env, "Registering ourselves (CDC, %s:%s, %s:%s) with shuckle", in_addr{htonl(_ownIp1)}, port1, in_addr{htonl(_ownIp2)}, port2);
std::string err = registerCDC(_shuckleHost, _shucklePort, 10_sec, _ownIp1, port1, _ownIp2, port2);
std::string err;
if(likely(_registerCompleted)) {
std::array<AddrsInfo, 5> replicas;
LOG_INFO(_env, "Fetching replicas for CDC from shuckle");
err = fetchCDCReplicas(_shuckleHost, _shucklePort, 10_sec, replicas);
if (!err.empty()) {
_env.updateAlert(_alert, "Failed getting CDC replicas from shuckle: %s", err);
return false;
}
if (_info != replicas[_replicaId.u8]) {
_env.updateAlert(_alert, "AddrsInfo in shuckle: %s , not matching local AddrsInfo: %s", replicas[_replicaId.u8], _info);
return false;
}
{
std::lock_guard guard(_shared.replicasLock);
_shared.replicas = replicas;
}
}
LOG_DEBUG(_env, "Registering ourselves (CDC %s, %s) with shuckle", _replicaId, _info);
err = registerCDCReplica(_shuckleHost, _shucklePort, 10_sec, _replicaId, _replicaId == _leaderReplicaId, _info);
if (!err.empty()) {
_env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", err);
return false;
}
_env.clearAlert(_alert);
_registerCompleted = true;
return true;
}
};
@@ -1018,6 +1060,8 @@ void runCDC(const std::string& dbDir, const CDCOptions& options) {
LOG_INFO(env, "Running CDC with options:");
LOG_INFO(env, " level = %s", options.logLevel);
LOG_INFO(env, " logFile = '%s'", options.logFile);
LOG_INFO(env, " replicaId = %s", options.replicaId);
LOG_INFO(env, " leaderReplicaId = %s", options.leaderReplicaId);
LOG_INFO(env, " port = %s", options.port);
LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost);
LOG_INFO(env, " shucklePort = %s", options.shucklePort);