Files
ternfs-XTXMarkets/cpp/cdc/CDC.cpp
Francesco Mazzoli 38f3d54ecd Wait forever, rather than having timeouts
The goal here is to not have constant wakeups due to timeout. Do
not attempt to clean things up nicely before termination -- just
terminate instead. We can setup a proper termination system in
the future, I first want to see if this makes a difference.

Also, change xmon to use pipes for communication, so that it can
wait without timers as well.

Also, `write` directly for logging, so that we know the logs will
make it to the file after the logging call returns (since we now
do not have the chance to flush them afterwards).
2023-12-07 10:11:19 +00:00

968 lines
38 KiB
C++

#include <chrono>
#include <fstream>
#include <memory>
#include <mutex>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <sys/socket.h>
#include <atomic>
#include <fcntl.h>
#include <optional>
#include <thread>
#include <unordered_map>
#include <arpa/inet.h>
#include <unordered_set>
#include <map>
#include <poll.h>
#include "Bincode.hpp"
#include "CDC.hpp"
#include "CDCDB.hpp"
#include "Env.hpp"
#include "Exception.hpp"
#include "Msgs.hpp"
#include "Shard.hpp"
#include "Time.hpp"
#include "CDCDB.hpp"
#include "Crypto.hpp"
#include "CDCKey.hpp"
#include "Shuckle.hpp"
#include "XmonAgent.hpp"
#include "wyhash.h"
#include "Xmon.hpp"
#include "Timings.hpp"
#include "PeriodicLoop.hpp"
#include "Loop.hpp"
#include "ErrorCount.hpp"
struct CDCShared {
CDCDB& db;
std::array<std::atomic<uint16_t>, 2> ownPorts;
std::mutex shardsMutex;
std::array<ShardInfo, 256> shards;
// How long it took us to process the entire request, from parse to response.
std::array<Timings, maxCDCMessageKind+1> timingsTotal;
std::array<ErrorCount, maxCDCMessageKind+1> errors;
// right now we have a max of 200req/s and we send the metrics every minute or so, so
// this should cover us for at least 1.5mins. Power of two is good for mod.
std::array<std::atomic<size_t>, 0x3FFF> inFlightTxnsWindow;
ErrorCount shardErrors;
CDCShared(CDCDB& db_) : db(db_) {
for (auto& shard: shards) {
memset(&shard, 0, sizeof(shard));
}
ownPorts[0].store(0);
ownPorts[1].store(0);
for (CDCMessageKind kind : allCDCMessageKind) {
timingsTotal[(int)kind] = Timings::Standard();
}
for (int i = 0; i < inFlightTxnsWindow.size(); i++) {
inFlightTxnsWindow[i] = 0;
}
}
};
struct InFlightShardRequest {
CDCTxnId txnId; // the txn id that requested this shard request
EggsTime sentAt;
ShardId shid;
};
struct InFlightCDCRequest {
bool hasClient;
uint64_t lastSentRequestId;
// if hasClient=false, the following is all garbage.
uint64_t cdcRequestId;
EggsTime receivedAt;
struct sockaddr_in clientAddr;
CDCMessageKind kind;
int sock;
};
// these can happen through normal user interaction
//
// MISMATCHING_CREATION_TIME can happen if we generate a timeout
// in CDC.cpp, but the edge was actually created, and when we
// try to recreate it we get a bad creation time.
static bool innocuousShardError(EggsError err) {
return err == EggsError::NAME_NOT_FOUND || err == EggsError::EDGE_NOT_FOUND || err == EggsError::DIRECTORY_NOT_EMPTY || err == EggsError::MISMATCHING_CREATION_TIME;
}
// These can happen but should be rare.
//
// DIRECTORY_HAS_OWNER can happen in gc (we clean it up and then remove
// it, but somebody else might have created stuff in it in the meantime)
static bool rareInnocuousShardError(EggsError err) {
return err == EggsError::DIRECTORY_HAS_OWNER;
}
struct InFlightCDCRequestKey {
uint64_t requestId;
uint32_t ip;
uint16_t port;
InFlightCDCRequestKey(uint64_t requestId_, struct sockaddr_in clientAddr_) :
requestId(requestId_), ip(clientAddr_.sin_addr.s_addr), port(clientAddr_.sin_port)
{}
bool operator==(const InFlightCDCRequestKey& other) const {
return requestId == other.requestId && ip == other.ip && port == other.port;
}
};
template <>
struct std::hash<InFlightCDCRequestKey> {
std::size_t operator()(const InFlightCDCRequestKey& key) const {
return std::hash<uint64_t>{}(key.requestId ^ (((uint64_t)key.port << 32) | ((uint64_t)key.ip)));
}
};
struct InFlightShardRequests {
private:
using RequestsMap = std::unordered_map<uint64_t, InFlightShardRequest>;
RequestsMap _reqs;
std::map<EggsTime, uint64_t> _pq;
public:
size_t size() const {
return _reqs.size();
}
RequestsMap::const_iterator oldest() const {
ALWAYS_ASSERT(size() > 0);
auto reqByTime = _pq.begin();
return _reqs.find(reqByTime->second);
}
RequestsMap::const_iterator find(uint64_t reqId) const {
return _reqs.find(reqId);
}
RequestsMap::const_iterator end() {
return _reqs.end();
}
void erase(RequestsMap::const_iterator iterator) {
_pq.erase(iterator->second.sentAt);
_reqs.erase(iterator);
}
void insert(uint64_t reqId, const InFlightShardRequest& req) {
auto [reqIt, inserted] = _reqs.insert({reqId, req});
// TODO i think we can just assert inserted, we never need this
// functionality
if (inserted) {
// we have never seen this shard request.
// technically we could get the same time twice, but in practice
// we won't, so just assert it.
ALWAYS_ASSERT(_pq.insert({req.sentAt, reqId}).second);
} else {
// we had already seen this. make sure it's for the same stuff, and update pq.
ALWAYS_ASSERT(reqIt->second.txnId == req.txnId);
ALWAYS_ASSERT(reqIt->second.shid == req.shid);
ALWAYS_ASSERT(_pq.erase(reqIt->second.sentAt) == 1); // must be already present
ALWAYS_ASSERT(_pq.insert({req.sentAt, reqId}).second); // insert with new time
reqIt->second.sentAt = req.sentAt; // update time in existing entry
}
}
};
struct CDCReqInfo {
uint64_t reqId;
struct sockaddr_in clientAddr;
EggsTime receivedAt;
int sock;
};
struct CDCServer : Loop {
private:
CDCShared& _shared;
bool _seenShards;
std::array<IpPort, 2> _ipPorts;
uint64_t _currentLogIndex;
std::vector<char> _recvBuf;
std::vector<char> _sendBuf;
std::vector<CDCReqContainer> _cdcReqs;
std::vector<CDCReqInfo> _cdcReqsInfo;
std::vector<CDCTxnId> _cdcReqsTxnIds;
std::vector<CDCShardResp> _shardResps;
int _maxUpdateSize;
CDCStep _step;
uint64_t _shardRequestIdCounter;
// order: CDC, shard, CDC, shard
// length will be 2 or 4 depending on whether we have a second ip
std::vector<struct pollfd> _socks;
AES128Key _expandedCDCKey;
Duration _shardTimeout;
// The requests we've enqueued, but haven't completed yet, with
// where to send the response. Indexed by txn id.
std::unordered_map<CDCTxnId, InFlightCDCRequest> _inFlightTxns;
uint64_t _inFlightTxnsWindowCursor;
// The enqueued requests, but indexed by req id + ip + port. We
// store this so that we can drop repeated requests which are
// still queued, and which will therefore be processed in due
// time anyway. This relies on clients having unique req ids. It's
// kinda unsafe anyway (if clients get restarted), but it's such
// a useful optimization for now that we live with it.
std::unordered_set<InFlightCDCRequestKey> _inFlightCDCReqs;
// The _shard_ request we're currently waiting for, if any.
InFlightShardRequests _inFlightShardReqs;
public:
CDCServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared) :
Loop(logger, xmon, "req_server"),
_shared(shared),
_seenShards(false),
_ipPorts(options.ipPorts),
_recvBuf(DEFAULT_UDP_MTU),
_sendBuf(DEFAULT_UDP_MTU),
_maxUpdateSize(500),
// important to not catch stray requests from previous executions
_shardRequestIdCounter(wyhash64_rand()),
_shardTimeout(options.shardTimeout),
_inFlightTxnsWindowCursor(0)
{
_currentLogIndex = _shared.db.lastAppliedLogEntry();
expandKey(CDCKey, _expandedCDCKey);
LOG_INFO(_env, "Waiting for shard info to be filled in");
}
virtual void step() override {
if (!_seenShards) {
if (!_waitForShards()) {
return;
}
_seenShards = true;
_initAfterShardsSeen();
}
// clear internal buffers
_cdcReqs.clear();
_cdcReqsInfo.clear();
_cdcReqsTxnIds.clear();
_shardResps.clear();
// Process CDC requests and shard responses
{
auto now = eggsNow();
while (_updateSize() < _maxUpdateSize) {
if (_inFlightShardReqs.size() == 0) { break; }
auto oldest = _inFlightShardReqs.oldest();
if ((now - oldest->second.sentAt) < _shardTimeout) { break; }
LOG_DEBUG(_env, "in-flight shard request %s was sent at %s, it's now %s, will time out (%s > %s)", oldest->first, oldest->second.sentAt, now, (now - oldest->second.sentAt), _shardTimeout);
uint64_t requestId = oldest->first;
auto resp = _prepareCDCShardResp(requestId); // erases `oldest`
ALWAYS_ASSERT(resp != nullptr); // must be there, we've just timed it out
_recordCDCShardRespError(requestId, *resp, EggsError::TIMEOUT);
}
}
LOG_DEBUG(_env, "Blocking to wait for readable sockets");
int ret = poll(_socks.data(), _socks.size(), -1);
if (ret < 0) {
throw SYSCALL_EXCEPTION("poll");
}
// Drain sockets, first the shard resps ones (so we clear existing txns
// first), then the CDC reqs ones.
for (int i = 1; i < _socks.size(); i += 2) {
const auto& sock = _socks[i];
if (sock.revents & (POLLIN|POLLHUP|POLLERR)) {
_drainShardSock(sock.fd);
}
}
for (int i = 0; i < _socks.size(); i += 2) {
const auto& sock = _socks[i];
if (sock.revents & (POLLIN|POLLHUP|POLLERR)) {
_drainCDCSock(sock.fd);
}
}
// If anything happened, update the db and write down the in flight CDCs
if (_cdcReqs.size() > 0 || _shardResps.size() > 0) {
// process everything in a single batch
_shared.db.update(true, _advanceLogIndex(), _cdcReqs, _shardResps, _step, _cdcReqsTxnIds);
// record txn ids etc. for newly received requests
for (int i = 0; i < _cdcReqs.size(); i++) {
const auto& req = _cdcReqs[i];
const auto& reqInfo = _cdcReqsInfo[i];
CDCTxnId txnId = _cdcReqsTxnIds[i];
ALWAYS_ASSERT(_inFlightTxns.find(txnId) == _inFlightTxns.end());
auto& inFlight = _inFlightTxns[txnId];
inFlight.hasClient = true;
inFlight.cdcRequestId = reqInfo.reqId;
inFlight.clientAddr = reqInfo.clientAddr;
inFlight.kind = req.kind();
inFlight.receivedAt = reqInfo.receivedAt;
inFlight.sock = reqInfo.sock;
_updateInFlightTxns();
_inFlightCDCReqs.insert(InFlightCDCRequestKey(reqInfo.reqId, reqInfo.clientAddr));
}
_processStep();
}
}
private:
void _updateInFlightTxns() {
_shared.inFlightTxnsWindow[_inFlightTxnsWindowCursor%_shared.inFlightTxnsWindow.size()].store(_inFlightTxns.size());
_inFlightTxnsWindowCursor++;
}
bool _waitForShards() {
bool badShard = false;
{
const std::lock_guard<std::mutex> lock(_shared.shardsMutex);
for (int i = 0; i < _shared.shards.size(); i++) {
const auto sh = _shared.shards[i];
if (sh.port1 == 0) {
LOG_DEBUG(_env, "Shard %s isn't ready yet", i);
badShard = true;
break;
}
}
}
if (badShard) {
(10_ms).sleep();
return false;
}
LOG_INFO(_env, "shards found, proceeding");
return true;
}
// To be called when we have a shard response with given `reqId`.
// Searches it in the in flight map, removes it from it, and
// adds a CDCShardResp to `_shardResps`.
// nullptr if we couldn't find the in flight response. Fills in txnId,
// and nothing else.
CDCShardResp* _prepareCDCShardResp(uint64_t reqId) {
// If it's not the request we wanted, skip
auto reqIt = _inFlightShardReqs.find(reqId);
if (reqIt == _inFlightShardReqs.end()) {
// This is a fairly common occurrence when timing out
LOG_DEBUG(_env, "got unexpected shard request id %s, dropping", reqId);
return nullptr;
}
CDCTxnId txnId = reqIt->second.txnId;
auto& resp = _shardResps.emplace_back();
resp.txnId = reqIt->second.txnId;
_inFlightShardReqs.erase(reqIt); // not in flight anymore
return &resp;
}
void _recordCDCShardRespError(uint64_t requestId, CDCShardResp& resp, EggsError err) {
resp.err = err;
_shared.shardErrors.add(err);
if (resp.err == EggsError::TIMEOUT) {
LOG_DEBUG(_env, "txn %s shard req %s, timed out", resp.txnId, requestId);
} else if (innocuousShardError(resp.err)) {
LOG_DEBUG(_env, "txn %s shard req %s, finished with innocuous error %s", resp.txnId, requestId, resp.err);
} else if (rareInnocuousShardError(resp.err)) {
LOG_INFO(_env, "txn %s shard req %s, finished with rare innocuous error %s", resp.txnId, requestId, resp.err);
} else {
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", resp.txnId, requestId, resp.err);
}
}
void _initAfterShardsSeen() {
// initialize everything after having seen the shards
// Create sockets. We create one socket for listening to client requests and one for listening
// the the shard's responses. If we have two IPs we do this twice.
_socks.resize((_ipPorts[1].ip == 0) ? 2 : 4);
for (int i = 0; i < _socks.size(); i++) {
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
_socks[i].fd = sock;
_socks[i].events = POLLIN;
if (sock < 0) {
throw SYSCALL_EXCEPTION("cannot create socket");
}
if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) {
throw SYSCALL_EXCEPTION("fcntl");
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
{
uint32_t ipN = htonl(_ipPorts[i/2].ip);
memcpy(&addr.sin_addr.s_addr, &ipN, 4);
}
if (i%2 == 0 && _ipPorts[i/2].port != 0) { // CDC with specified port
addr.sin_port = htons(_ipPorts[i/2].port);
} else { // automatically assigned port
addr.sin_port = 0;
}
if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
throw SYSCALL_EXCEPTION("cannot bind socket to addr %s", addr);
}
{
socklen_t addrLen = sizeof(addr);
if (getsockname(sock, (struct sockaddr*)&addr, &addrLen) < 0) {
throw SYSCALL_EXCEPTION("getsockname");
}
}
if (i%2 == 0) {
LOG_DEBUG(_env, "bound CDC %s sock to port %s", i/2, ntohs(addr.sin_port));
_shared.ownPorts[i/2].store(ntohs(addr.sin_port));
} else {
LOG_DEBUG(_env, "bound shard %s sock to port %s", i/2, ntohs(addr.sin_port));
// CDC req/resps are very small (say 50bytes), so this gives us space for
// 20k responses, which paired with the high timeout we currently set in production
// (1s) should give us high throughput without retrying very often.
int bufSize = 1<<20;
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void*)&bufSize, sizeof(bufSize)) < 0) {
throw SYSCALL_EXCEPTION("setsockopt");
}
}
}
LOG_INFO(_env, "running on ports %s and %s", _shared.ownPorts[0].load(), _shared.ownPorts[1].load());
// If we've got dangling transactions, immediately start processing it
_shared.db.bootstrap(true, _advanceLogIndex(), _step);
_processStep();
}
size_t _updateSize() const {
return _cdcReqs.size() + _shardResps.size();
}
void _drainCDCSock(int sock) {
struct sockaddr_in clientAddr;
while (_updateSize() < _maxUpdateSize) {
// Read one request
memset(&clientAddr, 0, sizeof(clientAddr));
socklen_t addrLen = sizeof(clientAddr);
ssize_t read = recvfrom(sock, &_recvBuf[0], _recvBuf.size(), 0, (struct sockaddr*)&clientAddr, &addrLen);
if (read < 0 && errno == EAGAIN) {
return;
}
if (read < 0) {
throw SYSCALL_EXCEPTION("recvfrom");
}
LOG_DEBUG(_env, "received CDC request from %s", clientAddr);
BincodeBuf reqBbuf(&_recvBuf[0], read);
// First, try to parse the header
CDCRequestHeader reqHeader;
try {
reqHeader.unpack(reqBbuf);
} catch (const BincodeException& err) {
LOG_ERROR(_env, "could not parse: %s", err.what());
RAISE_ALERT(_env, "could not parse request header from %s, dropping it.", clientAddr);
continue;
}
LOG_DEBUG(_env, "received request id %s, kind %s", reqHeader.requestId, reqHeader.kind);
auto receivedAt = eggsNow();
// If we're already processing this request, drop it to try to not clog the queue
if (_inFlightCDCReqs.contains(InFlightCDCRequestKey(reqHeader.requestId, clientAddr))) {
LOG_DEBUG(_env, "dropping req id %s from %s since it's already being processed", reqHeader.requestId, clientAddr);
continue;
}
// If this will be filled in with an actual code, it means that we couldn't process
// the request.
EggsError err = NO_ERROR;
// Now, try to parse the body
auto& cdcReq = _cdcReqs.emplace_back();
try {
cdcReq.unpack(reqBbuf, reqHeader.kind);
LOG_DEBUG(_env, "parsed request: %s", cdcReq);
} catch (const BincodeException& exc) {
LOG_ERROR(_env, "could not parse: %s", exc.what());
RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, clientAddr);
err = EggsError::MALFORMED_REQUEST;
}
// Make sure nothing is left
if (err == NO_ERROR && reqBbuf.remaining() != 0) {
RAISE_ALERT(_env, "%s bytes remaining after parsing CDC request of kind %s from %s, will reply with error", reqBbuf.remaining(), reqHeader.kind, clientAddr);
err = EggsError::MALFORMED_REQUEST;
}
if (err == NO_ERROR) {
LOG_DEBUG(_env, "CDC request %s successfully parsed, will process soon", cdcReq.kind());
_cdcReqsInfo.emplace_back(CDCReqInfo{
.reqId = reqHeader.requestId,
.clientAddr = clientAddr,
.receivedAt = receivedAt,
.sock = sock,
});
} else {
// We couldn't parse, reply immediately with an error
RAISE_ALERT(_env, "request %s failed before enqueue with error %s", cdcReq.kind(), err);
_sendError(sock, reqHeader.requestId, err, clientAddr);
_cdcReqs.pop_back(); // let's just forget all about this
}
}
}
void _drainShardSock(int sock) {
while (_updateSize() < _maxUpdateSize) {
ssize_t read = recv(sock, &_recvBuf[0], _recvBuf.size(), 0);
if (read < 0 && errno == EAGAIN) {
return;
}
if (read < 0) {
throw SYSCALL_EXCEPTION("recv");
}
LOG_DEBUG(_env, "received response from shard");
BincodeBuf reqBbuf(&_recvBuf[0], read);
ShardResponseHeader respHeader;
try {
respHeader.unpack(reqBbuf);
} catch (BincodeException err) {
LOG_ERROR(_env, "could not parse: %s", err.what());
RAISE_ALERT(_env, "could not parse response header, dropping response");
continue;
}
LOG_DEBUG(_env, "received response id %s, kind %s", respHeader.requestId, respHeader.kind);
// Note that below we just let the BincodeExceptions propagate upwards since we
// control all the code in this codebase, and the header is good, and we're a
// bit lazy.
auto shardResp = _prepareCDCShardResp(respHeader.requestId);
if (shardResp == nullptr) {
// we couldn't find it
continue;
}
// We got an error
if (respHeader.kind == (ShardMessageKind)0) {
_recordCDCShardRespError(respHeader.requestId, *shardResp, reqBbuf.unpackScalar<EggsError>());
LOG_DEBUG(_env, "got error %s for response id %s", shardResp->err, respHeader.requestId);
continue;
}
// Otherwise, parse the body
shardResp->resp.unpack(reqBbuf, respHeader.kind);
LOG_DEBUG(_env, "parsed shard response: %s", shardResp->resp);
ALWAYS_ASSERT(reqBbuf.remaining() == 0);
_shared.shardErrors.add(NO_ERROR);
// If all went well, advance with the newly received request
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, process soon", respHeader.requestId, respHeader.kind);
}
}
#ifdef __clang__
__attribute__((no_sanitize("integer"))) // might wrap around (it's initialized randomly)
#endif
inline uint64_t _freshShardReqId() {
_shardRequestIdCounter++;
return _shardRequestIdCounter;
}
void _processStep() {
LOG_DEBUG(_env, "processing step %s", _step);
// finished txns
for (const auto& [txnId, resp]: _step.finishedTxns) {
LOG_DEBUG(_env, "txn %s finished", txnId);
// we need to send the response back to the client
auto inFlight = _inFlightTxns.find(txnId);
if (inFlight->second.hasClient) {
_shared.timingsTotal[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt);
_shared.errors[(int)inFlight->second.kind].add(resp.err);
if (resp.err != NO_ERROR) {
_sendError(inFlight->second.sock, inFlight->second.cdcRequestId, resp.err, inFlight->second.clientAddr);
} else {
LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr);
BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size());
CDCResponseHeader respHeader(inFlight->second.cdcRequestId, inFlight->second.kind);
respHeader.pack(bbuf);
resp.resp.pack(bbuf);
_send(inFlight->second.sock, inFlight->second.clientAddr, (const char*)bbuf.data, bbuf.len());
}
_inFlightCDCReqs.erase(InFlightCDCRequestKey(inFlight->second.cdcRequestId, inFlight->second.clientAddr));
}
_inFlightTxns.erase(inFlight);
_updateInFlightTxns();
}
// in flight txns
for (const auto& [txnId, shardReq]: _step.runningTxns) {
CDCShardReq prevReq;
LOG_TRACE(_env, "txn %s needs shard %s, req %s", txnId, shardReq.shid, shardReq.req);
BincodeBuf bbuf(&_sendBuf[0], _sendBuf.size());
// Header
ShardRequestHeader shardReqHeader;
// Do not allocate new req id for repeated requests, so that we'll just accept
// the first one that comes back. There's a chance for the txnId to not be here
// yet: if we have just restarted the CDC. In this case we fill it in here, but
// obviously without client addr.
auto inFlightTxn = _inFlightTxns.find(txnId);
if (inFlightTxn == _inFlightTxns.end()) {
LOG_INFO(_env, "Could not find in-flight transaction %s, this might be because the CDC was restarted in the middle of a transaction.", txnId);
InFlightCDCRequest req;
req.hasClient = false;
req.lastSentRequestId = _freshShardReqId();
inFlightTxn = _inFlightTxns.emplace(txnId, req).first;
shardReqHeader.requestId = req.lastSentRequestId;
_updateInFlightTxns();
} else if (shardReq.repeated) {
shardReqHeader.requestId = inFlightTxn->second.lastSentRequestId;
} else {
shardReqHeader.requestId = _freshShardReqId();
}
shardReqHeader.kind = shardReq.req.kind();
shardReqHeader.pack(bbuf);
// Body
shardReq.req.pack(bbuf);
// MAC, if necessary
if (isPrivilegedRequestKind(shardReqHeader.kind)) {
bbuf.packFixedBytes<8>({cbcmac(_expandedCDCKey, bbuf.data, bbuf.len())});
}
// Send
struct sockaddr_in shardAddr;
memset(&shardAddr, 0, sizeof(shardAddr));
_shared.shardsMutex.lock();
ShardInfo shardInfo = _shared.shards[shardReq.shid.u8];
_shared.shardsMutex.unlock();
auto now = eggsNow(); // randomly pick one of the shard addrs and one of our sockets
int whichShardAddr = now.ns & !!shardInfo.port2;
int whichSock = (now.ns>>1) & !!_ipPorts[1].ip;
shardAddr.sin_family = AF_INET;
shardAddr.sin_port = htons(whichShardAddr ? shardInfo.port2 : shardInfo.port1);
static_assert(sizeof(shardAddr.sin_addr) == sizeof(shardInfo.ip1));
memcpy(&shardAddr.sin_addr, (whichShardAddr ? shardInfo.ip2 : shardInfo.ip1).data.data(), sizeof(shardAddr.sin_addr));
LOG_DEBUG(_env, "sending request for txn %s with req id %s to shard %s (%s)", txnId, shardReqHeader.requestId, shardReq.shid, shardAddr);
_send(_socks[whichSock*2 + 1].fd, shardAddr, (const char*)bbuf.data, bbuf.len());
// Record the in-flight req
_inFlightShardReqs.insert(shardReqHeader.requestId, InFlightShardRequest{
.txnId = txnId,
.sentAt = now,
.shid = shardReq.shid,
});
inFlightTxn->second.lastSentRequestId = shardReqHeader.requestId;
}
}
void _sendError(int sock, uint64_t requestId, EggsError err, struct sockaddr_in& clientAddr) {
BincodeBuf respBbuf(&_sendBuf[0], _sendBuf.size());
CDCResponseHeader(requestId, CDCMessageKind::ERROR).pack(respBbuf);
respBbuf.packScalar<uint16_t>((uint16_t)err);
// We're sending an error back to a user, so always send it from the CDC sock.
_send(sock, clientAddr, (const char*)respBbuf.data, respBbuf.len());
LOG_DEBUG(_env, "sent error %s to %s", err, clientAddr);
}
void _send(int sock, struct sockaddr_in& dest, const char* data, size_t len) {
// We need to handle EAGAIN/EPERM when trying to send. Here we take a ...
// lazy approach and just loop with a delay. This seems to happen when
// we restart everything while under load, it's not great to block here
// but it's probably OK to do so in those cases. We should also automatically
// clear the alert when done with this.
XmonNCAlert alert(1_sec);
for (;;) {
if (likely(sendto(sock, data, len, 0, (struct sockaddr*)&dest, sizeof(dest)) == len)) {
break;
}
int err = errno;
// Note that we get EPERM on `sendto` when nf drops packets.
if (likely(err == EAGAIN || err == EPERM)) {
_env.updateAlert(alert, "we got %s/%s=%s when trying to send shard message, will wait and retry", err, translateErrno(err), safe_strerror(err));
(100_ms).sleepRetry();
} else {
_env.clearAlert(alert);
throw EXPLICIT_SYSCALL_EXCEPTION(err, "sendto");
}
}
_env.clearAlert(alert);
}
uint64_t _advanceLogIndex() {
return ++_currentLogIndex;
}
};
struct CDCShardUpdater : PeriodicLoop {
CDCShared& _shared;
std::string _shuckleHost;
uint16_t _shucklePort;
// loop data
std::array<ShardInfo, 256> _shards;
XmonNCAlert _alert;
public:
CDCShardUpdater(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared):
PeriodicLoop(logger, xmon, "shard_updater", {1_sec, 1_mins}),
_shared(shared),
_shuckleHost(options.shuckleHost),
_shucklePort(options.shucklePort),
_alert(10_sec)
{
_env.updateAlert(_alert, "Waiting to get shards");
}
virtual bool periodicStep() override {
LOG_INFO(_env, "Fetching shards");
std::string err = fetchShards(_shuckleHost, _shucklePort, 10_sec, _shards);
if (!err.empty()) {
_env.updateAlert(_alert, "failed to reach shuckle at %s:%s to fetch shards, will retry: %s", _shuckleHost, _shucklePort, err);
return false;
}
bool badShard = false;
for (int i = 0; i < _shards.size(); i++) {
if (_shards[i].port1 == 0) {
badShard = true;
break;
}
}
if (badShard) {
EggsTime successfulIterationAt = 0;
_env.updateAlert(_alert, "Shard info is still not present in shuckle, will keep trying");
return false;
}
{
const std::lock_guard<std::mutex> lock(_shared.shardsMutex);
for (int i = 0; i < _shards.size(); i++) {
_shared.shards[i] = _shards[i];
}
}
_env.clearAlert(_alert);
LOG_INFO(_env, "successfully fetched all shards from shuckle, will wait one minute");
return true;
}
};
struct CDCRegisterer : PeriodicLoop {
CDCShared& _shared;
uint32_t _ownIp1;
uint32_t _ownIp2;
std::string _shuckleHost;
uint16_t _shucklePort;
bool _hasSecondIp;
XmonNCAlert _alert;
public:
CDCRegisterer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared):
PeriodicLoop(logger, xmon, "registerer", { 1_sec, 1_mins }),
_shared(shared),
_ownIp1(options.ipPorts[0].ip),
_ownIp2(options.ipPorts[1].ip),
_shuckleHost(options.shuckleHost),
_shucklePort(options.shucklePort),
_hasSecondIp(options.ipPorts[1].ip != 0),
_alert(10_sec)
{}
virtual bool periodicStep() override {
uint16_t port1 = _shared.ownPorts[0].load();
uint16_t port2 = _shared.ownPorts[1].load();
if (port1 == 0 || (_hasSecondIp && port2 == 0)) {
return false;
}
LOG_DEBUG(_env, "Registering ourselves (CDC, %s:%s, %s:%s) with shuckle", in_addr{htonl(_ownIp1)}, port1, in_addr{htonl(_ownIp2)}, port2);
std::string err = registerCDC(_shuckleHost, _shucklePort, 10_sec, _ownIp1, port1, _ownIp2, port2);
if (!err.empty()) {
_env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", err);
return false;
}
_env.clearAlert(_alert);
return true;
}
};
struct CDCStatsInserter : PeriodicLoop {
private:
CDCShared& _shared;
std::string _shuckleHost;
uint16_t _shucklePort;
XmonNCAlert _alert;
std::vector<Stat> _stats;
public:
CDCStatsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared):
PeriodicLoop(logger, xmon, "stats_inserter", {1_sec, 1_hours}),
_shared(shared),
_shuckleHost(options.shuckleHost),
_shucklePort(options.shucklePort),
_alert(10_sec)
{}
virtual bool periodicStep() override {
std::string err;
for (CDCMessageKind kind : allCDCMessageKind) {
{
std::ostringstream prefix;
prefix << "cdc." << kind;
_shared.timingsTotal[(int)kind].toStats(prefix.str(), _stats);
_shared.errors[(int)kind].toStats(prefix.str(), _stats);
}
}
err = insertStats(_shuckleHost, _shucklePort, 10_sec, _stats);
_stats.clear();
if (err.empty()) {
_env.clearAlert(_alert);
for (CDCMessageKind kind : allCDCMessageKind) {
_shared.timingsTotal[(int)kind].reset();
_shared.errors[(int)kind].reset();
}
return true;
} else {
_env.updateAlert(_alert, "Could not insert stats: %s", err);
return false;
}
}
// TODO restore this when we can
// virtual void finish() override {
// LOG_INFO(_env, "inserting stats one last time");
// periodicStep();
// }
};
struct CDCMetricsInserter : PeriodicLoop {
private:
CDCShared& _shared;
XmonNCAlert _alert;
MetricsBuilder _metricsBuilder;
std::unordered_map<std::string, uint64_t> _rocksDBStats;
public:
CDCMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, CDCShared& shared):
PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}),
_shared(shared),
_alert(10_sec)
{}
virtual bool periodicStep() {
auto now = eggsNow();
for (CDCMessageKind kind : allCDCMessageKind) {
const ErrorCount& errs = _shared.errors[(int)kind];
for (int i = 0; i < errs.count.size(); i++) {
uint64_t count = errs.count[i].load();
if (count == 0) { continue; }
_metricsBuilder.measurement("eggsfs_cdc_requests");
_metricsBuilder.tag("kind", kind);
if (i == 0) {
_metricsBuilder.tag("error", "NO_ERROR");
} else {
_metricsBuilder.tag("error", (EggsError)i);
}
_metricsBuilder.fieldU64("count", count);
_metricsBuilder.timestamp(now);
}
}
{
_metricsBuilder.measurement("eggsfs_cdc_in_flight_txns");
uint64_t sum = 0;
for (size_t x: _shared.inFlightTxnsWindow) {
sum += x;
}
_metricsBuilder.fieldFloat("size", (double)sum / (double)_shared.inFlightTxnsWindow.size());
_metricsBuilder.timestamp(now);
}
for (int i = 0; i < _shared.shardErrors.count.size(); i++) {
uint64_t count = _shared.shardErrors.count[i].load();
if (count == 0) { continue; }
_metricsBuilder.measurement("eggsfs_cdc_shard_requests");
if (i == 0) {
_metricsBuilder.tag("error", "NO_ERROR");
} else {
_metricsBuilder.tag("error", (EggsError)i);
}
_metricsBuilder.fieldU64("count", count);
_metricsBuilder.timestamp(now);
}
{
_rocksDBStats.clear();
_shared.db.rocksDBMetrics(_rocksDBStats);
for (const auto& [name, value]: _rocksDBStats) {
_metricsBuilder.measurement("eggsfs_cdc_rocksdb");
_metricsBuilder.fieldU64(name, value);
_metricsBuilder.timestamp(now);
}
}
std::string err = sendMetrics(10_sec, _metricsBuilder.payload());
_metricsBuilder.reset();
if (err.empty()) {
LOG_INFO(_env, "Sent metrics to influxdb");
_env.clearAlert(_alert);
return true;
} else {
_env.updateAlert(_alert, "Could not insert metrics: %s", err);
return false;
}
}
};
void runCDC(const std::string& dbDir, const CDCOptions& options) {
int logOutFd = STDOUT_FILENO;
if (!options.logFile.empty()) {
logOutFd = open(options.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644);
if (logOutFd < 0) {
throw SYSCALL_EXCEPTION("open");
}
}
Logger logger(options.logLevel, logOutFd, options.syslog, true);
std::shared_ptr<XmonAgent> xmon;
if (options.xmon) {
xmon = std::make_shared<XmonAgent>();
}
Env env(logger, xmon, "startup");
LOG_INFO(env, "Running CDC with options:");
LOG_INFO(env, " level = %s", options.logLevel);
LOG_INFO(env, " logFile = '%s'", options.logFile);
LOG_INFO(env, " port = %s", options.port);
LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost);
LOG_INFO(env, " shucklePort = %s", options.shucklePort);
for (int i = 0; i < 2; i++) {
LOG_INFO(env, " port%s = %s", i+1, options.ipPorts[0].port);
{
char ip[INET_ADDRSTRLEN];
uint32_t ipN = options.ipPorts[i].ip;
LOG_INFO(env, " ownIp%s = %s", i+1, inet_ntop(AF_INET, &ipN, ip, INET_ADDRSTRLEN));
}
}
LOG_INFO(env, " syslog = %s", (int)options.syslog);
std::vector<std::thread> threads;
// xmon first, so that by the time it shuts down it'll have all the leftover requests
if (xmon) {
threads.emplace_back([&logger, xmon, &options]() mutable {
XmonConfig config;
config.appInstance = "eggscdc";
config.appType = "restech_eggsfs.critical";
config.prod = options.xmonProd;
Xmon(logger, xmon, config).run();
});
}
CDCDB db(logger, xmon, dbDir);
CDCShared shared(db);
threads.emplace_back([&logger, xmon, &options, &shared]() mutable {
CDCServer(logger, xmon, options, shared).run();
});
threads.emplace_back([&logger, xmon, &options, &shared]() mutable {
CDCShardUpdater(logger, xmon, options, shared).run();
});
threads.emplace_back([&logger, xmon, &options, &shared]() mutable {
CDCRegisterer(logger, xmon, options, shared).run();
});
threads.emplace_back([&logger, xmon, &options, &shared]() mutable {
CDCStatsInserter(logger, xmon, options, shared).run();
});
if (options.metrics) {
threads.emplace_back([&logger, xmon, &shared]() mutable {
CDCMetricsInserter(logger, xmon, shared).run();
});
}
for (;;) { sleep(60*60*24); }
}