Files
ternfs-XTXMarkets/cpp/shard/Shard.cpp
Francesco Mazzoli 8075e99bb6 Graceful shard teardown
See <https://mazzo.li/posts/stopping-linux-threads.html> for tradeoffs
regarding how to terminate threads gracefully.

The goal of this work was for valgrind to work correctly, which in turn
was to investigate #141. It looks like I have succeeded:

    ==2715080== Warning: unimplemented fcntl command: 1036
    ==2715080== 20,052 bytes in 5,013 blocks are definitely lost in loss record 133 of 135
    ==2715080==    at 0x483F013: operator new(unsigned long) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
    ==2715080==    by 0x3B708E: allocate (new_allocator.h:121)
    ==2715080==    by 0x3B708E: allocate (allocator.h:173)
    ==2715080==    by 0x3B708E: allocate (alloc_traits.h:460)
    ==2715080==    by 0x3B708E: _M_allocate (stl_vector.h:346)
    ==2715080==    by 0x3B708E: std::vector<Crc, std::allocator<Crc> >::_M_default_append(unsigned long) (vector.tcc:635)
    ==2715080==    by 0x42BF1C: resize (stl_vector.h:940)
    ==2715080==    by 0x42BF1C: ShardDBImpl::_fileSpans(rocksdb::ReadOptions&, FileSpansReq const&, FileSpansResp&) (shard/ShardDB.cpp:921)
    ==2715080==    by 0x420867: ShardDBImpl::read(ShardReqContainer const&, ShardRespContainer&) (shard/ShardDB.cpp:1034)
    ==2715080==    by 0x3CB3EE: ShardServer::_handleRequest(int, sockaddr_in*, char*, unsigned long) (shard/Shard.cpp:347)
    ==2715080==    by 0x3C8A39: ShardServer::step() (shard/Shard.cpp:405)
    ==2715080==    by 0x40B1E8: run (core/Loop.cpp:67)
    ==2715080==    by 0x40B1E8: startLoop(void*) (core/Loop.cpp:37)
    ==2715080==    by 0x4BEA258: start_thread (in /usr/lib/libpthread-2.33.so)
    ==2715080==    by 0x4D005E2: clone (in /usr/lib/libc-2.33.so)
    ==2715080==
    ==2715080==
    ==2715080== Exit program on first error (--exit-on-first-error=yes)
2024-01-08 15:41:22 +00:00

907 lines
36 KiB
C++

#include <atomic>
#include <memory>
#include <mutex>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <sys/socket.h>
#include <fstream>
#include <chrono>
#include <thread>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <poll.h>
#include "Assert.hpp"
#include "Bincode.hpp"
#include "Common.hpp"
#include "Crypto.hpp"
#include "Exception.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "Shard.hpp"
#include "Env.hpp"
#include "ShardDB.hpp"
#include "CDCKey.hpp"
#include "Shuckle.hpp"
#include "Time.hpp"
#include "Time.hpp"
#include "wyhash.h"
#include "Xmon.hpp"
#include "Timings.hpp"
#include "ErrorCount.hpp"
#include "PeriodicLoop.hpp"
#include "Metrics.hpp"
#include "Loop.hpp"
#include "SPSC.hpp"
struct QueuedShardLogEntry {
ShardLogEntry logEntry;
// if requestId == 0, the rest is all garbage -- we don't need it.
// this is for log entries that are not generated by users (e.g.
// block service updates).
uint64_t requestId;
EggsTime receivedAt;
struct sockaddr_in clientAddr;
int sockIx; // which sock to use to reply
ShardMessageKind requestKind;
};
// TODO make options
const int LOG_ENTRIES_QUEUE_SIZE = 8192; // a few megabytes, should be quite a bit bigger than the below
const int RECVMMSG_LEN = 1000; // how many messages to read at once at most
const int MAX_WRITES_AT_ONCE = 200; // say that each write is ~100bytes, this gives us 20KB of write
const int MAX_RECV_MSGS = 100;
struct ShardShared {
ShardDB& db;
std::array<std::atomic<uint32_t>, 2> ips;
std::array<std::atomic<uint32_t>, 2> ports;
std::array<struct pollfd, 2> socks;
std::atomic<bool> blockServicesWritten;
std::array<Timings, maxShardMessageKind+1> timings;
std::array<ErrorCount, maxShardMessageKind+1> errors;
SPSC<QueuedShardLogEntry> logEntriesQueue;
std::mutex logEntriesQueuePushLock; // almost always uncontended
std::atomic<double> logEntriesQueueSize;
std::array<std::atomic<double>, 2> receivedRequests; // how many requests we got at once from each socket
std::atomic<double> pulledWriteRequests; // how many requests we got from write queue
ShardShared() = delete;
ShardShared(ShardDB& db_): db(db_), ips{0, 0}, ports{0, 0}, blockServicesWritten(false), logEntriesQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) {
for (ShardMessageKind kind : allShardMessageKind) {
timings[(int)kind] = Timings::Standard();
}
for (auto& x: receivedRequests) {
x = 0;
}
}
};
static bool bigRequest(ShardMessageKind kind) {
return unlikely(
kind == ShardMessageKind::ADD_SPAN_INITIATE ||
kind == ShardMessageKind::ADD_SPAN_CERTIFY
);
}
static bool bigResponse(ShardMessageKind kind) {
return unlikely(
kind == ShardMessageKind::READ_DIR ||
kind == ShardMessageKind::ADD_SPAN_INITIATE ||
kind == ShardMessageKind::FILE_SPANS ||
kind == ShardMessageKind::VISIT_DIRECTORIES ||
kind == ShardMessageKind::VISIT_FILES ||
kind == ShardMessageKind::VISIT_TRANSIENT_FILES ||
kind == ShardMessageKind::BLOCK_SERVICE_FILES ||
kind == ShardMessageKind::FULL_READ_DIR
);
}
static void packResponse(
Env& env,
ShardShared& shared,
std::vector<char>& sendBuf,
std::vector<struct mmsghdr>& sendHdrs,
std::vector<struct iovec>& sendVecs,
uint64_t requestId,
ShardMessageKind kind,
Duration elapsed,
bool dropArtificially,
struct sockaddr_in* clientAddr,
int sockIx,
EggsError err,
const ShardRespContainer& resp
) {
// pack into sendBuf
size_t sendBufBegin = sendBuf.size();
sendBuf.resize(sendBufBegin + MAX_UDP_MTU);
BincodeBuf respBbuf(&sendBuf[sendBufBegin], MAX_UDP_MTU);
if (err == NO_ERROR) {
LOG_DEBUG(env, "successfully processed request %s with kind %s in %s", requestId, kind, elapsed);
if (bigResponse(kind)) {
if (unlikely(env._shouldLog(LogLevel::LOG_TRACE))) {
LOG_TRACE(env, "resp body: %s", resp);
} else {
LOG_DEBUG(env, "resp body: <omitted>");
}
} else {
LOG_DEBUG(env, "resp body: %s", resp);
}
ShardResponseHeader(requestId, kind).pack(respBbuf);
resp.pack(respBbuf);
} else {
LOG_DEBUG(env, "request %s failed with error %s in %s", kind, err, elapsed);
ShardResponseHeader(requestId, ShardMessageKind::ERROR).pack(respBbuf);
respBbuf.packScalar<uint16_t>((uint16_t)err);
}
sendBuf.resize(sendBufBegin + respBbuf.len());
shared.timings[(int)kind].add(elapsed);
shared.errors[(int)kind].add(err);
if (unlikely(dropArtificially)) {
LOG_DEBUG(env, "artificially dropping response %s", requestId);
sendBuf.resize(sendBufBegin);
return;
}
LOG_DEBUG(env, "will send response for req id %s kind %s to %s", requestId, kind, *clientAddr);
// Prepare sendmmsg stuff. The vectors might be resized by the
// time we get to sending this, so store references when we must
// -- we'll fix up the actual values later.
auto& hdr = sendHdrs.emplace_back();
hdr.msg_hdr = {
.msg_name = clientAddr,
.msg_namelen = sizeof(*clientAddr),
.msg_iovlen = 1,
};
hdr.msg_len = respBbuf.len();
auto& vec = sendVecs.emplace_back();
vec.iov_base = (void*)sendBufBegin;
vec.iov_len = respBbuf.len();
}
struct ShardServer : Loop {
private:
// init data
ShardShared& _shared;
ShardId _shid;
std::array<IpPort, 2> _ipPorts;
uint64_t _packetDropRand;
uint64_t _incomingPacketDropProbability; // probability * 10,000
uint64_t _outgoingPacketDropProbability; // probability * 10,000
// run data
AES128Key _expandedCDCKey;
// recvmmsg data (one per socket, since we receive from both and send from both
// using some of the header data)
std::array<std::vector<char>, 2> _recvBuf;
std::array<std::vector<struct mmsghdr>, 2> _recvHdrs;
std::array<std::vector<struct sockaddr_in>, 2> _recvAddrs;
std::array<std::vector<struct iovec>, 2> _recvVecs;
// what we parse into
ShardReqContainer _reqContainer;
ShardRespContainer _respContainer;
// log entries buffers
std::vector<QueuedShardLogEntry> _logEntries;
// sendmmsg data
std::vector<char> _sendBuf;
std::array<std::vector<struct mmsghdr>, 2> _sendHdrs; // one per socket
std::array<std::vector<struct iovec>, 2> _sendVecs;
public:
ShardServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared) :
Loop(logger, xmon, "server"),
_shared(shared),
_shid(shid),
_ipPorts(options.ipPorts),
_packetDropRand(eggsNow().ns),
_incomingPacketDropProbability(0),
_outgoingPacketDropProbability(0)
{
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
if (prob != 0.0) {
LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what);
iprob = prob * 10'000.0;
ALWAYS_ASSERT(iprob > 0 && iprob < 10'000);
}
};
convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability);
convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability);
_init();
}
virtual ~ShardServer() = default;
private:
void _init() {
LOG_INFO(_env, "initializing server sockets");
expandKey(CDCKey, _expandedCDCKey);
memset(_shared.socks.data(), 0, _shared.socks.size()*sizeof(_shared.socks[0]));
for (int i = 0; i < _ipPorts.size(); i++) {
const auto& ipPort = _ipPorts[i];
if (ipPort.ip == 0) { break; }
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sock < 0) {
throw SYSCALL_EXCEPTION("cannot create socket");
}
struct sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
uint32_t ipn = htonl(ipPort.ip);
memcpy(&serverAddr.sin_addr.s_addr, &ipn, sizeof(ipn));
serverAddr.sin_port = htons(ipPort.port);
if (bind(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0) {
char ip[INET_ADDRSTRLEN];
throw SYSCALL_EXCEPTION("cannot bind socket to addr %s:%s", inet_ntop(AF_INET, &serverAddr.sin_addr, ip, INET_ADDRSTRLEN), ipPort.port);
}
{
socklen_t addrLen = sizeof(serverAddr);
if (getsockname(sock, (struct sockaddr*)&serverAddr, &addrLen) < 0) {
throw SYSCALL_EXCEPTION("getsockname");
}
}
// stats are ~50byte, and are the most common request, say 100byte
// per request on average, 1MiB buffer should be enough for 10k requests
// or so in each of the two queues.
{
int bufSize = 1<<20;
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void*)&bufSize, sizeof(bufSize)) < 0) {
throw SYSCALL_EXCEPTION("setsockopt");
}
}
// store addresses/fds
_shared.ips[i].store(ntohl(serverAddr.sin_addr.s_addr), std::memory_order_release);
_shared.ports[i].store(ntohs(serverAddr.sin_port), std::memory_order_release);
_shared.socks[i].fd = sock;
_shared.socks[i].events = POLL_IN;
LOG_INFO(_env, "Bound shard %s to %s", _shid, serverAddr);
_recvBuf[i].resize(DEFAULT_UDP_MTU*MAX_RECV_MSGS);
_recvHdrs[i].resize(MAX_RECV_MSGS);
memset(_recvHdrs[i].data(), 0, sizeof(_recvHdrs[i][0])*MAX_RECV_MSGS);
_recvAddrs[i].resize(MAX_RECV_MSGS);
_recvVecs[i].resize(MAX_RECV_MSGS);
for (int j = 0; j < _recvVecs[i].size(); j++) {
_recvVecs[i][j].iov_base = &_recvBuf[i][j*DEFAULT_UDP_MTU];
_recvVecs[i][j].iov_len = DEFAULT_UDP_MTU;
_recvHdrs[i][j].msg_hdr.msg_iov = &_recvVecs[i][j];
_recvHdrs[i][j].msg_hdr.msg_iovlen = 1;
_recvHdrs[i][j].msg_hdr.msg_namelen = sizeof(_recvAddrs[i][j]);
_recvHdrs[i][j].msg_hdr.msg_name = &_recvAddrs[i][j];
}
}
}
void _handleRequest(int sockIx, struct sockaddr_in* clientAddr, char* buf, size_t len) {
LOG_DEBUG(_env, "received message from %s", *clientAddr);
BincodeBuf reqBbuf(buf, len);
// First, try to parse the header
ShardRequestHeader reqHeader;
try {
reqHeader.unpack(reqBbuf);
} catch (const BincodeException& err) {
LOG_ERROR(_env, "Could not parse: %s", err.what());
RAISE_ALERT(_env, "could not parse request header from %s, dropping it.", *clientAddr);
return;
}
if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) {
LOG_DEBUG(_env, "artificially dropping request %s", reqHeader.requestId);
return;
}
auto t0 = eggsNow();
LOG_DEBUG(_env, "received request id %s, kind %s, from %s", reqHeader.requestId, reqHeader.kind, *clientAddr);
// If this will be filled in with an actual code, it means that we couldn't process
// the request.
EggsError err = NO_ERROR;
// Now, try to parse the body
try {
_reqContainer.unpack(reqBbuf, reqHeader.kind);
if (bigRequest(reqHeader.kind)) {
if (unlikely(_env._shouldLog(LogLevel::LOG_TRACE))) {
LOG_TRACE(_env, "parsed request: %s", _reqContainer);
} else {
LOG_DEBUG(_env, "parsed request: <omitted>");
}
} else {
LOG_DEBUG(_env, "parsed request: %s", _reqContainer);
}
} catch (const BincodeException& exc) {
LOG_ERROR(_env, "Could not parse: %s", exc.what());
RAISE_ALERT(_env, "could not parse request of kind %s from %s, will reply with error.", reqHeader.kind, *clientAddr);
err = EggsError::MALFORMED_REQUEST;
}
// authenticate, if necessary
if (isPrivilegedRequestKind(reqHeader.kind)) {
auto expectedMac = cbcmac(_expandedCDCKey, reqBbuf.data, reqBbuf.cursor - reqBbuf.data);
BincodeFixedBytes<8> receivedMac;
reqBbuf.unpackFixedBytes<8>(receivedMac);
if (expectedMac != receivedMac.data) {
err = EggsError::NOT_AUTHORISED;
}
}
// Make sure nothing is left
if (unlikely(err == NO_ERROR && reqBbuf.remaining() != 0)) {
RAISE_ALERT(_env, "%s bytes remaining after parsing request of kind %s from %s, will reply with error", reqBbuf.remaining(), reqHeader.kind, *clientAddr);
err = EggsError::MALFORMED_REQUEST;
}
// At this point, if it's a read request, we can process it,
// if it's a write request we prepare the log entry and
// send it off.
if (likely(err == NO_ERROR)) {
if (readOnlyShardReq(_reqContainer.kind())) {
err = _shared.db.read(_reqContainer, _respContainer);
} else {
auto& entry = _logEntries.emplace_back();
entry.sockIx = sockIx;
entry.clientAddr = *clientAddr;
entry.receivedAt = t0;
entry.requestKind = reqHeader.kind;
entry.requestId = reqHeader.requestId;
err = _shared.db.prepareLogEntry(_reqContainer, entry.logEntry);
if (likely(err == NO_ERROR)) {
return; // we're done here, move along
} else {
_logEntries.pop_back(); // back out the log entry
}
}
}
Duration elapsed = eggsNow() - t0;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
packResponse(_env, _shared, _sendBuf, _sendHdrs[sockIx], _sendVecs[sockIx], reqHeader.requestId, reqHeader.kind, elapsed, dropArtificially, clientAddr, sockIx, err, _respContainer);
}
public:
virtual void step() override {
if (unlikely(!_shared.blockServicesWritten)) {
(100_ms).sleepRetry();
return;
}
_logEntries.clear();
_sendBuf.clear();
for (int i = 0; i < 2; i++) {
_sendHdrs[i].clear();
_sendVecs[i].clear();
}
if (unlikely(poll(_shared.socks.data(), 1 + (_shared.socks[1].fd != 0)) < 0)) {
if (errno == EINTR) { return; }
throw SYSCALL_EXCEPTION("poll");
}
for (int sockIx = 0; sockIx < _shared.socks.size(); sockIx++) {
const auto& sock = _shared.socks[sockIx];
if (!(sock.revents & POLLIN)) { continue; }
int msgs = recvmmsg(_shared.socks[sockIx].fd, &_recvHdrs[sockIx][0], _recvHdrs[sockIx].size(), MSG_DONTWAIT, nullptr);
if (unlikely(msgs < 0)) { // we know we have data from poll, we won't get EAGAIN
throw SYSCALL_EXCEPTION("recvmmsgs");
}
LOG_DEBUG(_env, "received %s messages from socket %s", msgs, sockIx);
if (msgs > 0) {
_shared.receivedRequests[sockIx] = _shared.receivedRequests[sockIx]*0.95 + ((double)msgs)*0.05;
}
for (int msgIx = 0; msgIx < msgs; msgIx++) {
auto& hdr = _recvHdrs[sockIx][msgIx];
auto clientAddr = (struct sockaddr_in *)hdr.msg_hdr.msg_name;
_handleRequest(sockIx, clientAddr, (char*)hdr.msg_hdr.msg_iov->iov_base, hdr.msg_len);
}
}
// write out write requests to queue
{
size_t numLogEntries = _logEntries.size();
if (numLogEntries > 0) {
LOG_DEBUG(_env, "pushing %s log entries to writer", numLogEntries);
uint32_t pushed;
{
std::lock_guard guard(_shared.logEntriesQueuePushLock);
pushed = _shared.logEntriesQueue.push(_logEntries);
}
_shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.logEntriesQueue.size()*0.05;
if (pushed < numLogEntries) {
LOG_INFO(_env, "tried to push %s elements to write queue, but pushed %s instead", numLogEntries, pushed);
}
}
}
// write out read responses to UDP
for (int i = 0; i < 2; i++) {
if (_sendHdrs[i].size() == 0) { continue; }
LOG_DEBUG(_env, "sending %s read responses to sock %s", _sendHdrs[i].size(), i);
for (int j = 0; j < _sendHdrs[i].size(); j++) {
auto& vec = _sendVecs[i][j];
vec.iov_base = &_sendBuf[(size_t)vec.iov_base];
auto& hdr = _sendHdrs[i][j];
hdr.msg_hdr.msg_iov = &vec;
}
int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0);
if (unlikely(ret < 0)) {
// we get EPERM when nf drops packets
if (errno == EPERM) {
LOG_INFO(_env, "we got EPERM when trying to send %s messages, will drop them", _sendHdrs[i].size());
} else {
throw SYSCALL_EXCEPTION("sendto");
}
} else if (unlikely(ret < _sendHdrs[i].size())) {
LOG_INFO(_env, "dropping %s out of %s requests since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size());
}
}
}
};
struct ShardWriter : Loop {
private:
ShardShared& _shared;
uint64_t _currentLogIndex;
ShardRespContainer _respContainer;
std::vector<QueuedShardLogEntry> _logEntries;
// sendmmsg data (one per socket)
std::vector<char> _sendBuf;
std::array<std::vector<struct mmsghdr>, 2> _sendHdrs; // one per socket
std::array<std::vector<struct iovec>, 2> _sendVecs;
uint64_t _packetDropRand;
uint64_t _incomingPacketDropProbability; // probability * 10,000
uint64_t _outgoingPacketDropProbability; // probability * 10,000
struct WriterThread : LoopThread {
ShardShared& shared;
WriterThread(pthread_t thread_, ShardShared& shared_) : LoopThread(thread_), shared(shared_) {}
virtual ~WriterThread() = default;
WriterThread(const WriterThread&) = delete;
virtual void stop() {
LoopThread::stop();
shared.logEntriesQueue.close();
}
};
public:
ShardWriter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const ShardOptions& options, ShardShared& shared) :
Loop(logger, xmon, "writer"),
_shared(shared),
_packetDropRand(eggsNow().ns),
_incomingPacketDropProbability(0),
_outgoingPacketDropProbability(0)
{
_currentLogIndex = _shared.db.lastAppliedLogEntry();
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
if (prob != 0.0) {
LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what);
iprob = prob * 10'000.0;
ALWAYS_ASSERT(iprob > 0 && iprob < 10'000);
}
};
convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability);
convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability);
_logEntries.reserve(MAX_WRITES_AT_ONCE);
}
virtual ~ShardWriter() = default;
// We need a special one since we're waiting on a futex, not with poll
static std::unique_ptr<LoopThread> SpawnWriter(std::unique_ptr<ShardWriter>&& loop) {
ShardShared& shared = loop->_shared;
return std::make_unique<WriterThread>(std::move(Loop::Spawn(std::move(loop))->thread), shared);
}
virtual void step() override {
_logEntries.clear();
_sendBuf.clear();
for (int i = 0; i < _sendHdrs.size(); i++) {
_sendHdrs[i].clear();
_sendVecs[i].clear();
}
uint32_t pulled = _shared.logEntriesQueue.pull(_logEntries, MAX_WRITES_AT_ONCE);
if (likely(pulled > 0)) {
LOG_DEBUG(_env, "pulled %s requests from write queue", pulled);
_shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05;
} else {
// queue is closed, stop
stop();
}
for (auto& logEntry : _logEntries) {
if (likely(logEntry.requestId)) {
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr);
} else {
LOG_DEBUG(_env, "applying request-less log entry");
}
_currentLogIndex++;
EggsError err = _shared.db.applyLogEntry(logEntry.requestKind, _currentLogIndex, logEntry.logEntry, _respContainer);
if (likely(logEntry.requestId)) {
Duration elapsed = eggsNow() - logEntry.receivedAt;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
packResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer);
} else if (unlikely(err != NO_ERROR)) {
RAISE_ALERT(_env, "could not apply request-less log entry: %s", err);
}
// We can do this before we flush: the other writes will see this
// write already, which is what matters.
if (logEntry.logEntry.body.kind() == ShardLogEntryKind::UPDATE_BLOCK_SERVICES) {
LOG_INFO(_env, "applied block service update");
_shared.blockServicesWritten = true;
}
}
if (pulled > 0) {
LOG_DEBUG(_env, "flushing and sending %s writes", pulled);
_shared.db.flush(true);
// important to send all of them after the flush! otherwise it's not durable yet
for (int i = 0; i < _sendHdrs.size(); i++) {
if (_sendHdrs[i].size() == 0) { continue; }
LOG_DEBUG(_env, "sending %s write responses to socket %s", _sendHdrs[i].size(), i);
for (int j = 0; j < _sendHdrs[i].size(); j++) {
auto& vec = _sendVecs[i][j];
vec.iov_base = &_sendBuf[(size_t)vec.iov_base];
auto& hdr = _sendHdrs[i][j];
hdr.msg_hdr.msg_iov = &vec;
}
int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0);
if (unlikely(ret < 0)) {
// we get this when nf drops packets
if (errno != EPERM) {
throw SYSCALL_EXCEPTION("sendto");
} else {
LOG_INFO(_env, "dropping %s responses because of EPERM", _sendHdrs[i].size());
}
} else if (unlikely(ret < _sendHdrs[i].size())) {
LOG_INFO(_env, "dropping %s out of %s requests since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size());
}
}
}
}
};
struct ShardRegisterer : PeriodicLoop {
private:
ShardShared& _shared;
Stopper _stopper;
ShardId _shid;
std::string _shuckleHost;
uint16_t _shucklePort;
bool _hasSecondIp;
XmonNCAlert _alert;
public:
ShardRegisterer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared) :
PeriodicLoop(logger, xmon, "registerer", {1_sec, 1_mins}),
_shared(shared),
_shid(shid),
_shuckleHost(options.shuckleHost),
_shucklePort(options.shucklePort),
_hasSecondIp(options.ipPorts[1].port != 0)
{}
virtual ~ShardRegisterer() = default;
void init() {
_env.updateAlert(_alert, "Waiting to register ourselves for the first time");
}
virtual bool periodicStep() {
uint16_t port1 = _shared.ports[0].load();
uint16_t port2 = _shared.ports[1].load();
// Avoid registering with only one port, so that clients can just wait on
// the first port being ready and they always have both.
if (port1 == 0 || (_hasSecondIp && port2 == 0)) {
// shard server isn't up yet
return false;
}
uint32_t ip1 = _shared.ips[0].load();
uint32_t ip2 = _shared.ips[1].load();
LOG_INFO(_env, "Registering ourselves (shard %s, %s:%s, %s:%s) with shuckle", _shid, in_addr{htonl(ip1)}, port1, in_addr{htonl(ip2)}, port2);
std::string err = registerShard(_shuckleHost, _shucklePort, 10_sec, _shid, ip1, port1, ip2, port2);
if (!err.empty()) {
_env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", err);
return false;
}
_env.clearAlert(_alert);
return true;
}
};
struct ShardBlockServiceUpdater : PeriodicLoop {
private:
ShardShared& _shared;
ShardId _shid;
std::string _shuckleHost;
uint16_t _shucklePort;
XmonNCAlert _alert;
ShardRespContainer _respContainer;
std::vector<QueuedShardLogEntry> _logEntries;
public:
ShardBlockServiceUpdater(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared):
PeriodicLoop(logger, xmon, "bs_updater", {1_sec, 1_mins}),
_shared(shared),
_shid(shid),
_shuckleHost(options.shuckleHost),
_shucklePort(options.shucklePort)
{
_env.updateAlert(_alert, "Waiting to fetch block services for the first time");
}
virtual bool periodicStep() override {
LOG_INFO(_env, "about to fetch block services from %s:%s", _shuckleHost, _shucklePort);
_logEntries.clear();
auto& logEntry = _logEntries.emplace_back();
logEntry.logEntry.time = eggsNow();
auto& blockServicesEntry = logEntry.logEntry.body.setUpdateBlockServices();
std::string err = fetchBlockServices(_shuckleHost, _shucklePort, 10_sec, _shid, blockServicesEntry);
if (!err.empty()) {
_env.updateAlert(_alert, "could not reach shuckle: %s", err);
return false;
}
if (blockServicesEntry.blockServices.els.empty()) {
_env.updateAlert(_alert, "got no block services");
return false;
}
{
// The scheme below is a very cheap way to always pick different failure domains
// for our block services: we just set the current block services to be all of
// different failure domains, sharded by storage type.
//
// It does require having at least 14 failure domains (to do RS(10,4)), which is
// easy right now since we have ~100 failure domains in iceland.
// storage class -> failure domain -> block service ids
std::unordered_map<uint8_t, std::unordered_map<__int128, std::vector<const BlockServiceInfo*>>> blockServicesByFailureDomain;
for (const auto& blockService: logEntry.logEntry.body.getUpdateBlockServices().blockServices.els) {
if (blockService.flags & BLOCK_SERVICE_DONT_WRITE) { continue; }
__int128 failureDomain;
static_assert(sizeof(failureDomain) == sizeof(blockService.failureDomain));
memcpy(&failureDomain, &blockService.failureDomain.name.data[0], sizeof(failureDomain));
blockServicesByFailureDomain[blockService.storageClass][failureDomain].emplace_back(&blockService);
}
uint64_t rand = wyhash64_rand();
for (const auto& [storageClass, byFailureDomain]: blockServicesByFailureDomain) {
for (const auto& [failureDomain, blockServices]: byFailureDomain) {
blockServicesEntry.currentBlockServices.els.emplace_back(blockServices[wyhash64(&rand)%blockServices.size()]->id);
}
}
}
for (;;) {
uint32_t pushed;
{
std::lock_guard guard(_shared.logEntriesQueuePushLock);
pushed = _shared.logEntriesQueue.push(_logEntries);
}
if (unlikely(pushed == 0)) {
_env.updateAlert(_alert, "could not push update block services log entry to queue, will try again");
(100_ms).sleepRetry();
} else {
break;
}
}
LOG_DEBUG(_env, "pushed block block service update");
_env.clearAlert(_alert);
return true;
}
};
struct ShardStatsInserter : PeriodicLoop {
private:
ShardShared& _shared;
ShardId _shid;
std::string _shuckleHost;
uint16_t _shucklePort;
XmonNCAlert _alert;
std::vector<Stat> _stats;
public:
ShardStatsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared):
PeriodicLoop(logger, xmon, "stats", {1_mins, 1_hours}),
_shared(shared),
_shid(shid),
_shuckleHost(options.shuckleHost),
_shucklePort(options.shucklePort)
{}
virtual ~ShardStatsInserter() = default;
virtual bool periodicStep() override {
for (ShardMessageKind kind : allShardMessageKind) {
std::ostringstream prefix;
prefix << "shard." << std::setw(3) << std::setfill('0') << _shid << "." << kind;
_shared.timings[(int)kind].toStats(prefix.str(), _stats);
_shared.errors[(int)kind].toStats(prefix.str(), _stats);
}
LOG_INFO(_env, "inserting stats");
std::string err = insertStats(_shuckleHost, _shucklePort, 10_sec, _stats);
_stats.clear();
if (err.empty()) {
_env.clearAlert(_alert);
for (ShardMessageKind kind : allShardMessageKind) {
_shared.timings[(int)kind].reset();
_shared.errors[(int)kind].reset();
}
return true;
} else {
_env.updateAlert(_alert, "Could not insert stats: %s", err);
return false;
}
}
// TODO restore this when we have the functionality to do so
// virtual void finish() override {
// LOG_INFO(_env, "insert stats one last time");
// periodicStep();
// }
};
struct ShardMetricsInserter : PeriodicLoop {
private:
ShardShared& _shared;
ShardId _shid;
XmonNCAlert _alert;
MetricsBuilder _metricsBuilder;
std::unordered_map<std::string, uint64_t> _rocksDBStats;
public:
ShardMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardId shid, ShardShared& shared):
PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}),
_shared(shared),
_shid(shid)
{}
virtual ~ShardMetricsInserter() = default;
virtual bool periodicStep() {
_shared.db.dumpRocksDBStatistics();
auto now = eggsNow();
for (ShardMessageKind kind : allShardMessageKind) {
const ErrorCount& errs = _shared.errors[(int)kind];
for (int i = 0; i < errs.count.size(); i++) {
uint64_t count = errs.count[i].load();
if (count == 0) { continue; }
_metricsBuilder.measurement("eggsfs_shard_requests");
_metricsBuilder.tag("shard", _shid);
_metricsBuilder.tag("kind", kind);
_metricsBuilder.tag("write", !readOnlyShardReq(kind));
if (i == 0) {
_metricsBuilder.tag("error", "NO_ERROR");
} else {
_metricsBuilder.tag("error", (EggsError)i);
}
_metricsBuilder.fieldU64("count", count);
_metricsBuilder.timestamp(now);
}
}
{
_metricsBuilder.measurement("eggsfs_shard_write_queue");
_metricsBuilder.tag("shard", _shid);
_metricsBuilder.fieldFloat("size", _shared.logEntriesQueueSize);
_metricsBuilder.timestamp(now);
}
for (int i = 0; i < _shared.receivedRequests.size(); i++) {
_metricsBuilder.measurement("eggsfs_shard_received_requests");
_metricsBuilder.tag("shard", _shid);
_metricsBuilder.tag("socket", i);
_metricsBuilder.fieldFloat("count", _shared.receivedRequests[i]);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_pulled_write_requests");
_metricsBuilder.tag("shard", _shid);
_metricsBuilder.fieldFloat("count", _shared.pulledWriteRequests);
_metricsBuilder.timestamp(now);
}
{
_rocksDBStats.clear();
_shared.db.rocksDBMetrics(_rocksDBStats);
for (const auto& [name, value]: _rocksDBStats) {
_metricsBuilder.measurement("eggsfs_shard_rocksdb");
_metricsBuilder.tag("shard", _shid);
_metricsBuilder.fieldU64(name, value);
_metricsBuilder.timestamp(now);
}
}
std::string err = sendMetrics(10_sec, _metricsBuilder.payload());
_metricsBuilder.reset();
if (err.empty()) {
LOG_INFO(_env, "Sent metrics to influxdb");
_env.clearAlert(_alert);
return true;
} else {
_env.updateAlert(_alert, "Could not insert metrics: %s", err);
return false;
}
}
};
void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& options) {
int logOutFd = STDOUT_FILENO;
if (!options.logFile.empty()) {
logOutFd = open(options.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644);
if (logOutFd < 0) {
throw SYSCALL_EXCEPTION("open");
}
}
Logger logger(options.logLevel, logOutFd, options.syslog, true);
std::shared_ptr<XmonAgent> xmon;
if (options.xmon) {
xmon = std::make_shared<XmonAgent>();
}
Env env(logger, xmon, "startup");
{
LOG_INFO(env, "Running shard %s with options:", shid);
LOG_INFO(env, " level = %s", options.logLevel);
LOG_INFO(env, " logFile = '%s'", options.logFile);
LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost);
LOG_INFO(env, " shucklePort = %s", options.shucklePort);
for (int i = 0; i < 2; i++) {
LOG_INFO(env, " port%s = %s", i+1, options.ipPorts[0].port);
{
char ip[INET_ADDRSTRLEN];
uint32_t ipN = options.ipPorts[i].ip;
LOG_INFO(env, " ownIp%s = %s", i+1, inet_ntop(AF_INET, &ipN, ip, INET_ADDRSTRLEN));
}
}
LOG_INFO(env, " simulateIncomingPacketDrop = %s", options.simulateIncomingPacketDrop);
LOG_INFO(env, " simulateOutgoingPacketDrop = %s", options.simulateOutgoingPacketDrop);
LOG_INFO(env, " syslog = %s", (int)options.syslog);
}
// Immediately start xmon: we want the database initializing update to
// be there.
std::vector<std::unique_ptr<LoopThread>> threads;
if (xmon) {
XmonConfig config;
{
std::ostringstream ss;
ss << std::setw(3) << std::setfill('0') << shid;
config.appInstance = "eggsshard" + ss.str();
}
config.prod = options.xmonProd;
config.appType = "restech_eggsfs.critical";
threads.emplace_back(Loop::Spawn(std::make_unique<Xmon>(logger, xmon, config)));
}
// then everything else
XmonNCAlert dbInitAlert;
env.updateAlert(dbInitAlert, "initializing database");
ShardDB db(logger, xmon, shid, options.transientDeadlineInterval, dbDir);
env.clearAlert(dbInitAlert);
ShardShared shared(db);
threads.emplace_back(Loop::Spawn(std::make_unique<ShardServer>(logger, xmon, shid, options, shared)));
threads.emplace_back(ShardWriter::SpawnWriter(std::make_unique<ShardWriter>(logger, xmon, options, shared)));
threads.emplace_back(Loop::Spawn(std::make_unique<ShardRegisterer>(logger, xmon, shid, options, shared)));
threads.emplace_back(Loop::Spawn(std::make_unique<ShardBlockServiceUpdater>(logger, xmon, shid, options, shared)));
threads.emplace_back(Loop::Spawn(std::make_unique<ShardStatsInserter>(logger, xmon, shid, options, shared)));
if (options.metrics) {
threads.emplace_back(Loop::Spawn(std::make_unique<ShardMetricsInserter>(logger, xmon, shid, shared)));
}
// from this point on termination on SIGINT/SIGTERM will be graceful
waitUntilStopped(threads);
db.close();
LOG_INFO(env, "Shard terminating gracefully, bye.");
}