mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2026-01-06 02:49:45 -06:00
Convert build system to cmake
Also, produce fully static binaries. This means that `gethostname` does not work (doesn't work with static glibc unless you build it with `--enable-static-nss`, which no distro builds glibc with).
This commit is contained in:
528
cpp/cdc/CDC.cpp
Normal file
528
cpp/cdc/CDC.cpp
Normal file
@@ -0,0 +1,528 @@
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/ip.h>
|
||||
#include <sys/socket.h>
|
||||
#include <atomic>
|
||||
#include <sys/epoll.h>
|
||||
#include <fcntl.h>
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
#include "Bincode.hpp"
|
||||
#include "CDC.hpp"
|
||||
#include "CDCDB.hpp"
|
||||
#include "Env.hpp"
|
||||
#include "Exception.hpp"
|
||||
#include "Msgs.hpp"
|
||||
#include "MsgsGen.hpp"
|
||||
#include "Time.hpp"
|
||||
#include "Undertaker.hpp"
|
||||
#include "CDCDB.hpp"
|
||||
#include "Crypto.hpp"
|
||||
#include "CDCKey.hpp"
|
||||
#include "splitmix64.hpp"
|
||||
#include "Shuckle.hpp"
|
||||
|
||||
struct InFlightShardRequest {
|
||||
uint64_t txnId; // the txn id that requested this shard request
|
||||
EggsTime sentAt;
|
||||
uint64_t shardRequestId;
|
||||
ShardId shid;
|
||||
};
|
||||
|
||||
struct InFlightCDCRequest {
|
||||
uint64_t cdcRequestId;
|
||||
struct sockaddr_in clientAddr;
|
||||
CDCMessageKind kind;
|
||||
};
|
||||
|
||||
struct CDCServer : Undertaker::Reapable {
|
||||
private:
|
||||
Env _env;
|
||||
std::atomic<bool> _stop;
|
||||
std::string _shuckleAddr;
|
||||
uint16_t _shucklePort;
|
||||
uint16_t _port;
|
||||
std::vector<ShardInfo> _shards;
|
||||
CDCDB& _db;
|
||||
uint64_t _currentLogIndex;
|
||||
std::vector<char> _recvBuf;
|
||||
std::vector<char> _sendBuf;
|
||||
CDCReqContainer _cdcReqContainer;
|
||||
ShardRespContainer _shardRespContainer;
|
||||
CDCStep _step;
|
||||
uint64_t _shardRequestIdCounter;
|
||||
uint64_t _packetDropRand;
|
||||
uint64_t _packetDropProbability; // probability * 10,000
|
||||
std::array<int, 257> _socks;
|
||||
AES128Key _expandedCDCKey;
|
||||
// The requests we've enqueued, but haven't completed yet, with
|
||||
// where to send the response. Indexed by txn id.
|
||||
std::unordered_map<uint64_t, InFlightCDCRequest> _inFlightTxns;
|
||||
// The _shard_ request we're currently waiting for, if any.
|
||||
std::optional<InFlightShardRequest> _inFlightShardReq;
|
||||
|
||||
public:
|
||||
CDCServer(Logger& logger, const CDCOptions& options, std::vector<ShardInfo>&& shards, CDCDB& db) :
|
||||
_env(logger, "req_server"),
|
||||
_stop(false),
|
||||
_shuckleAddr(options.shuckleAddr),
|
||||
_shucklePort(options.shucklePort),
|
||||
_port(options.port),
|
||||
_shards(std::move(shards)),
|
||||
_db(db),
|
||||
_recvBuf(UDP_MTU),
|
||||
_sendBuf(UDP_MTU),
|
||||
_shardRequestIdCounter(0),
|
||||
_packetDropRand(0),
|
||||
_packetDropProbability(0)
|
||||
{
|
||||
if (options.simulatePacketDrop != 0.0) {
|
||||
LOG_INFO(_env, "will drop %s%% of packets", options.simulatePacketDrop*100.0);
|
||||
_packetDropProbability = options.simulatePacketDrop * 10'000.0;
|
||||
ALWAYS_ASSERT(_packetDropProbability > 0 && _packetDropProbability < 10'000);
|
||||
}
|
||||
_currentLogIndex = _db.lastAppliedLogEntry();
|
||||
memset(&_socks[0], 0, sizeof(_socks));
|
||||
expandKey(CDCKey, _expandedCDCKey);
|
||||
}
|
||||
|
||||
virtual ~CDCServer() = default;
|
||||
|
||||
virtual void terminate() override {
|
||||
_env.flush();
|
||||
_stop.store(true);
|
||||
}
|
||||
|
||||
virtual void onAbort() override {
|
||||
_env.flush();
|
||||
}
|
||||
|
||||
void registerWithShuckle() {
|
||||
|
||||
}
|
||||
|
||||
void run() {
|
||||
// Create sockets
|
||||
// First sock: the CDC sock
|
||||
// Next 256 socks: the socks we use to communicate with the shards
|
||||
for (int i = 0; i < _socks.size(); i++) {
|
||||
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||
_socks[i] = sock;
|
||||
if (sock < 0) {
|
||||
throw SYSCALL_EXCEPTION("cannot create socket");
|
||||
}
|
||||
if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) {
|
||||
throw SYSCALL_EXCEPTION("fcntl");
|
||||
}
|
||||
struct sockaddr_in addr;
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
if (i == 0 && _port != 0) { // CDC
|
||||
addr.sin_port = htons(_port);
|
||||
} else { // just to communicate with the shard
|
||||
addr.sin_port = 0;
|
||||
}
|
||||
if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
|
||||
if (i == 0 && _port != 0) {
|
||||
throw SYSCALL_EXCEPTION("cannot bind socket to port %s", _port);
|
||||
} else {
|
||||
throw SYSCALL_EXCEPTION("cannot bind socket");
|
||||
}
|
||||
}
|
||||
{
|
||||
socklen_t addrLen = sizeof(addr);
|
||||
if (getsockname(sock, (struct sockaddr*)&addr, &addrLen) < 0) {
|
||||
throw SYSCALL_EXCEPTION("getsockname");
|
||||
}
|
||||
}
|
||||
if (i == 0) {
|
||||
LOG_DEBUG(_env, "bound CDC sock to port %s", ntohs(addr.sin_port));
|
||||
_port = ntohs(addr.sin_port);
|
||||
} else {
|
||||
LOG_DEBUG(_env, "bound shard %s sock to port %s", i-1, ntohs(addr.sin_port));
|
||||
}
|
||||
}
|
||||
_registerWithShuckle();
|
||||
|
||||
// create epoll structure
|
||||
int epoll = epoll_create1(0);
|
||||
if (epoll < 0) {
|
||||
throw SYSCALL_EXCEPTION("epoll");
|
||||
}
|
||||
struct epoll_event events[_socks.size()];
|
||||
for (int i = 0; i < _socks.size(); i++) {
|
||||
auto& event = events[i];
|
||||
event.data.u64 = i;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
if (epoll_ctl(epoll, EPOLL_CTL_ADD, _socks[i], &event) == -1) {
|
||||
throw SYSCALL_EXCEPTION("epoll_ctl");
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO(_env, "running on port %s", _port);
|
||||
|
||||
// If we've got a dangling transaction, immediately start processing it
|
||||
_db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step);
|
||||
_processStep(_step);
|
||||
|
||||
// Start processing CDC requests and shard responses
|
||||
for (;;) {
|
||||
if (_stop.load()) {
|
||||
LOG_DEBUG(_env, "got told to stop, stopping");
|
||||
break;
|
||||
}
|
||||
|
||||
// timeout after 100ms
|
||||
if (_inFlightShardReq && (eggsNow() - _inFlightShardReq->sentAt) > 100_ms) {
|
||||
_inFlightShardReq.reset();
|
||||
_handleShardError(_inFlightShardReq->shid, EggsError::TIMEOUT);
|
||||
}
|
||||
|
||||
// 1ms timeout for prompt termination and for shard resps timeouts
|
||||
int nfds = epoll_wait(epoll, events, _socks.size(), 1 /*milliseconds*/);
|
||||
if (nfds < 0) {
|
||||
throw SYSCALL_EXCEPTION("epoll_wait");
|
||||
}
|
||||
|
||||
for (int i = 0; i < nfds; i++) {
|
||||
const auto& event = events[i];
|
||||
if (event.data.u64 == 0) {
|
||||
_drainCDCSock();
|
||||
} else {
|
||||
_drainShardSock(ShardId(event.data.u64-1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_db.close();
|
||||
}
|
||||
|
||||
private:
|
||||
void _registerWithShuckle() {
|
||||
ALWAYS_ASSERT(_port != 0);
|
||||
for (;;) {
|
||||
if (_stop.load()) {
|
||||
return;
|
||||
}
|
||||
std::array<uint8_t, 4> addr{127,0,0,1};
|
||||
std::string err = registerCDC(_shuckleAddr, _shucklePort, 100_ms, addr, _port);
|
||||
if (!err.empty()) {
|
||||
RAISE_ALERT(_env, "Couldn't register ourselves with shuckle: %s", err);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
LOG_INFO(_env, "Successfully registered with shuckle");
|
||||
}
|
||||
|
||||
|
||||
void _drainCDCSock() {
|
||||
int sock = _socks[0];
|
||||
|
||||
struct sockaddr_in clientAddr;
|
||||
|
||||
for (;;) {
|
||||
// Read one request
|
||||
memset(&clientAddr, 0, sizeof(clientAddr));
|
||||
socklen_t addrLen = sizeof(clientAddr);
|
||||
ssize_t read = recvfrom(sock, &_recvBuf[0], _recvBuf.size(), 0, (struct sockaddr*)&clientAddr, &addrLen);
|
||||
if (read < 0 && errno == EAGAIN) {
|
||||
return;
|
||||
}
|
||||
if (read < 0) {
|
||||
throw SYSCALL_EXCEPTION("recvfrom");
|
||||
}
|
||||
LOG_DEBUG(_env, "received CDC request from %s", clientAddr);
|
||||
|
||||
if (splitmix64(_packetDropRand) % 10'000 < _packetDropProbability) {
|
||||
LOG_DEBUG(_env, "artificially dropping packet");
|
||||
continue;
|
||||
}
|
||||
|
||||
BincodeBuf reqBbuf(&_recvBuf[0], read);
|
||||
|
||||
// First, try to parse the header
|
||||
CDCRequestHeader reqHeader;
|
||||
try {
|
||||
reqHeader.unpack(reqBbuf);
|
||||
} catch (BincodeException err) {
|
||||
LOG_ERROR(_env, "%s\nstacktrace:\n%s", err.what(), err.getStackTrace());
|
||||
RAISE_ALERT(_env, "could not parse request header from %s, dropping it.", clientAddr);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_DEBUG(_env, "received request id %s, kind %s", reqHeader.requestId, reqHeader.kind);
|
||||
|
||||
// If this will be filled in with an actual code, it means that we couldn't process
|
||||
// the request.
|
||||
EggsError err = NO_ERROR;
|
||||
|
||||
// Now, try to parse the body
|
||||
try {
|
||||
_cdcReqContainer.unpack(reqBbuf, reqHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed request: %s", _cdcReqContainer);
|
||||
} catch (BincodeException exc) {
|
||||
LOG_ERROR(_env, "%s\nstacktrace:\n%s", exc.what(), exc.getStackTrace());
|
||||
RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, clientAddr);
|
||||
err = EggsError::MALFORMED_REQUEST;
|
||||
}
|
||||
|
||||
// Make sure nothing is left
|
||||
if (err == NO_ERROR && reqBbuf.remaining() != 0) {
|
||||
RAISE_ALERT(_env, "%s bytes remaining after parsing CDC request of kind %s from %s, will reply with error", reqBbuf.remaining(), reqHeader.kind, clientAddr);
|
||||
err = EggsError::MALFORMED_REQUEST;
|
||||
}
|
||||
|
||||
if (err == NO_ERROR) {
|
||||
// If things went well, process the request
|
||||
LOG_DEBUG(_env, "CDC request %s successfully parsed, will now process", _cdcReqContainer.kind());
|
||||
uint64_t txnId = _db.processCDCReq(true, eggsNow(), _advanceLogIndex(), _cdcReqContainer, _step);
|
||||
auto& inFlight = _inFlightTxns[txnId];
|
||||
inFlight.cdcRequestId = reqHeader.requestId;
|
||||
inFlight.clientAddr = clientAddr;
|
||||
inFlight.kind = reqHeader.kind;
|
||||
// Go forward
|
||||
_processStep(_step);
|
||||
} else {
|
||||
// Otherwise we can immediately reply with an error
|
||||
RAISE_ALERT(_env, "request %s failed before enqueue with error %s", _cdcReqContainer.kind(), err);
|
||||
_sendError(reqHeader.requestId, err, clientAddr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _drainShardSock(ShardId shid) {
|
||||
for (;;) {
|
||||
int sock = _socks[(int)shid.u8 + 1];
|
||||
ssize_t read = recv(sock, &_recvBuf[0], _recvBuf.size(), 0);
|
||||
if (read < 0 && errno == EAGAIN) {
|
||||
return;
|
||||
}
|
||||
if (read < 0) {
|
||||
throw SYSCALL_EXCEPTION("recv");
|
||||
}
|
||||
|
||||
LOG_DEBUG(_env, "received response from shard %s", shid);
|
||||
|
||||
if (splitmix64(_packetDropRand) % 10'000 < _packetDropProbability) {
|
||||
LOG_DEBUG(_env, "artificially dropping packet");
|
||||
continue;
|
||||
}
|
||||
|
||||
BincodeBuf reqBbuf(&_recvBuf[0], read);
|
||||
|
||||
ShardResponseHeader respHeader;
|
||||
try {
|
||||
respHeader.unpack(reqBbuf);
|
||||
} catch (BincodeException err) {
|
||||
LOG_ERROR(_env, "%s\nstacktrace:\n%s", err.what(), err.getStackTrace());
|
||||
RAISE_ALERT(_env, "could not parse response header, dropping response");
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_DEBUG(_env, "received response id %s, kind %s", respHeader.requestId, respHeader.kind);
|
||||
|
||||
// Note that below we just let the BincodeExceptions propagate upwards since we
|
||||
// control all the code in this codebase, and the header is good, and we're a
|
||||
// bit lazy.
|
||||
|
||||
// If it's not the request we wanted, skip
|
||||
if (!_inFlightShardReq) {
|
||||
LOG_INFO(_env, "got unexpected shard request id %s, kind %s, from shard %s, dropping", respHeader.requestId, respHeader.kind, shid);
|
||||
continue;
|
||||
}
|
||||
if (_inFlightShardReq->shardRequestId != respHeader.requestId) {
|
||||
LOG_INFO(_env, "got unexpected shard request id %s (expected %s), kind %s, from shard %s, dropping", respHeader.requestId, _inFlightShardReq->shardRequestId, respHeader.kind, shid);
|
||||
continue;
|
||||
}
|
||||
uint64_t txnId = _inFlightShardReq->txnId;
|
||||
|
||||
// We can forget about this, we're going to process it right now
|
||||
_inFlightShardReq.reset();
|
||||
|
||||
// We got an error
|
||||
if (respHeader.kind == (ShardMessageKind)0) {
|
||||
EggsError err = reqBbuf.unpackScalar<EggsError>();
|
||||
_handleShardError(shid, err);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Otherwise, parse the body
|
||||
_shardRespContainer.unpack(reqBbuf, respHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed shard response: %s", _shardRespContainer);
|
||||
ALWAYS_ASSERT(reqBbuf.remaining() == 0);
|
||||
|
||||
// If all went well, advance with the newly received request
|
||||
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, will now process: %s", respHeader.requestId, respHeader.kind, _shardRespContainer);
|
||||
_db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step);
|
||||
_processStep(_step);
|
||||
}
|
||||
}
|
||||
|
||||
void _handleShardError(ShardId shid, EggsError err) {
|
||||
RAISE_ALERT(_env, "got shard error %s from shard %s", err, shid);
|
||||
_db.processShardResp(true, eggsNow(), _advanceLogIndex(), err, nullptr, _step);
|
||||
_processStep(_step);
|
||||
}
|
||||
|
||||
void _processStep(const CDCStep& step) {
|
||||
LOG_DEBUG(_env, "processing step %s", step);
|
||||
if (step.txnFinished != 0) {
|
||||
LOG_DEBUG(_env, "txn %s finished", step.txnFinished);
|
||||
// we need to send the response back to the client
|
||||
auto inFlight = _inFlightTxns.find(step.txnFinished);
|
||||
if (inFlight == _inFlightTxns.end()) {
|
||||
RAISE_ALERT(_env, "Could not find in-flight request %s, this might be because the CDC was restarted in the middle of a transaction.", step.txnFinished);
|
||||
} else {
|
||||
if (step.err != NO_ERROR) {
|
||||
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err);
|
||||
_sendError(inFlight->second.cdcRequestId, step.err, inFlight->second.clientAddr);
|
||||
} else {
|
||||
LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr);
|
||||
BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size());
|
||||
CDCResponseHeader respHeader(inFlight->second.cdcRequestId, inFlight->second.kind);
|
||||
respHeader.pack(bbuf);
|
||||
step.resp.pack(bbuf);
|
||||
_send(_socks[0], inFlight->second.clientAddr, (const char*)bbuf.data, bbuf.len());
|
||||
}
|
||||
_inFlightTxns.erase(inFlight);
|
||||
}
|
||||
}
|
||||
if (step.txnNeedsShard != 0) {
|
||||
LOG_DEBUG(_env, "txn %s needs shard %s, req %s", step.txnNeedsShard, step.shardReq.shid, step.shardReq.req);
|
||||
BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size());
|
||||
// Header
|
||||
ShardRequestHeader shardReqHeader;
|
||||
shardReqHeader.requestId = _shardRequestIdCounter;
|
||||
_shardRequestIdCounter++;
|
||||
shardReqHeader.kind = step.shardReq.req.kind();
|
||||
shardReqHeader.pack(bbuf);
|
||||
// Body
|
||||
step.shardReq.req.pack(bbuf);
|
||||
// MAC, if necessary
|
||||
if (isPrivilegedRequestKind(shardReqHeader.kind)) {
|
||||
bbuf.packFixedBytes<8>({cbcmac(_expandedCDCKey, bbuf.data, bbuf.len())});
|
||||
}
|
||||
// Send
|
||||
struct sockaddr_in shardAddr;
|
||||
memset(&shardAddr, 0, sizeof(shardAddr));
|
||||
const auto& shardInfo = _shards[step.shardReq.shid.u8];
|
||||
shardAddr.sin_family = AF_INET;
|
||||
shardAddr.sin_port = htons(shardInfo.port);
|
||||
static_assert(sizeof(shardAddr.sin_addr) == sizeof(shardInfo.ip));
|
||||
memcpy(&shardAddr.sin_addr, shardInfo.ip.data.data(), sizeof(shardAddr.sin_addr));
|
||||
LOG_DEBUG(_env, "sending request with req id %s to shard %s (%s)", shardReqHeader.requestId, step.shardReq.shid, shardAddr);
|
||||
_send(_socks[(int)step.shardReq.shid.u8 + 1], shardAddr, (const char*)bbuf.data, bbuf.len());
|
||||
// Record the in-flight req
|
||||
ALWAYS_ASSERT(!_inFlightShardReq);
|
||||
auto& inFlight = _inFlightShardReq.emplace();
|
||||
inFlight.shardRequestId = shardReqHeader.requestId;
|
||||
inFlight.sentAt = eggsNow();
|
||||
inFlight.txnId = step.txnNeedsShard;
|
||||
inFlight.shid = step.shardReq.shid;
|
||||
}
|
||||
if (step.nextTxn != 0) {
|
||||
LOG_DEBUG(_env, "we have txn %s lined up, starting it", step.nextTxn);
|
||||
_db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step);
|
||||
_processStep(_step);
|
||||
}
|
||||
}
|
||||
|
||||
void _sendError(uint64_t requestId, EggsError err, struct sockaddr_in& clientAddr) {
|
||||
BincodeBuf respBbuf(&_sendBuf[0], _sendBuf.size());
|
||||
CDCResponseHeader(requestId, CDCMessageKind::ERROR).pack(respBbuf);
|
||||
respBbuf.packScalar<uint16_t>((uint16_t)err);
|
||||
// We're sending an error back to a user, so always send it from the CDC sock.
|
||||
_send(_socks[0], clientAddr, (const char*)respBbuf.data, respBbuf.len());
|
||||
LOG_DEBUG(_env, "sent error %s to %s", err, clientAddr);
|
||||
}
|
||||
|
||||
void _send(int sock, struct sockaddr_in& dest, const char* data, size_t len) {
|
||||
// TODO we might very well get EAGAIN here since these are non-blocking sockets.
|
||||
// We should probably come up with a better strategy regarding writing stuff out,
|
||||
// but, being a bit lazy for now.
|
||||
if (sendto(sock, data, len, 0, (struct sockaddr*)&dest, sizeof(dest)) != len) {
|
||||
throw SYSCALL_EXCEPTION("sendto");
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t _advanceLogIndex() {
|
||||
return ++_currentLogIndex;
|
||||
}
|
||||
};
|
||||
|
||||
static void* runCDCServer(void* server) {
|
||||
((CDCServer*)server)->run();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::vector<ShardInfo> lookupShardInfo(Logger& logger, const CDCOptions& options) {
|
||||
Env env(logger, "lookup_shard_info");
|
||||
auto startT = eggsNow();
|
||||
std::vector<ShardInfo> shards;
|
||||
for (;;) {
|
||||
if (eggsNow() - startT > 20_sec) {
|
||||
throw EGGS_EXCEPTION("could not reach shuckle to get shards after 20 seconds, giving up");
|
||||
}
|
||||
|
||||
std::string err = fetchShards(options.shuckleAddr, options.shucklePort, 100_ms, shards);
|
||||
if (!err.empty()) {
|
||||
LOG_INFO(env, "failed to reach shuckle at %s:%s to fetch shards, might retry: %s", options.shuckleAddr, options.shucklePort, err);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
continue;
|
||||
}
|
||||
|
||||
bool badShard = false;
|
||||
for (int i = 0; i < 256; i++) {
|
||||
const auto& shard = shards.at(i);
|
||||
if (shard.port == 0) {
|
||||
LOG_INFO(env, "shard %s is not registered in shuckle yet, might retry", i);
|
||||
badShard = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (badShard) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_INFO(env, "sucessfully fetched shards from shuckle");
|
||||
return shards;
|
||||
}
|
||||
}
|
||||
|
||||
void runCDC(const std::string& dbDir, const CDCOptions& options) {
|
||||
auto undertaker = Undertaker::acquireUndertaker();
|
||||
|
||||
std::ostream* logOut = &std::cout;
|
||||
std::ofstream fileOut;
|
||||
if (!options.logFile.empty()) {
|
||||
fileOut = std::ofstream(options.logFile, std::ios::out | std::ios::app);
|
||||
if (!fileOut.is_open()) {
|
||||
throw EGGS_EXCEPTION("Could not open log file `%s'\n", options.logFile);
|
||||
}
|
||||
logOut = &fileOut;
|
||||
}
|
||||
Logger logger(options.level, *logOut);
|
||||
|
||||
auto shards = lookupShardInfo(logger, options);
|
||||
|
||||
CDCDB db(logger, dbDir);
|
||||
|
||||
{
|
||||
auto server = std::make_unique<CDCServer>(logger, options, std::move(shards), db);
|
||||
pthread_t tid;
|
||||
if (pthread_create(&tid, nullptr, &runCDCServer, &*server) != 0) {
|
||||
throw SYSCALL_EXCEPTION("pthread_create");
|
||||
}
|
||||
undertaker->checkin(std::move(server), tid, "server");
|
||||
}
|
||||
|
||||
undertaker->reap();
|
||||
}
|
||||
Reference in New Issue
Block a user