#include #include #include #include #include #include #include #include #include #include #include #include #include #include "Assert.hpp" #include "Bincode.hpp" #include "Common.hpp" #include "Crypto.hpp" #include "Exception.hpp" #include "Msgs.hpp" #include "MsgsGen.hpp" #include "Shard.hpp" #include "Env.hpp" #include "ShardDB.hpp" #include "CDCKey.hpp" #include "Shuckle.hpp" #include "Time.hpp" #include "Time.hpp" #include "wyhash.h" #include "Xmon.hpp" #include "Timings.hpp" #include "ErrorCount.hpp" #include "PeriodicLoop.hpp" #include "Metrics.hpp" #include "Loop.hpp" #include "SPSC.hpp" struct QueuedShardLogEntry { ShardLogEntry logEntry; // if requestId == 0, the rest is all garbage -- we don't need it. // this is for log entries that are not generated by users (e.g. // block service updates). uint64_t requestId; EggsTime receivedAt; struct sockaddr_in clientAddr; int sockIx; // which sock to use to reply ShardMessageKind requestKind; }; // TODO make options const int LOG_ENTRIES_QUEUE_SIZE = 8192; // a few megabytes, should be quite a bit bigger than the below const int RECVMMSG_LEN = 1000; // how many messages to read at once at most const int MAX_WRITES_AT_ONCE = 200; // say that each write is ~100bytes, this gives us 20KB of write const int MAX_RECV_MSGS = 100; struct ShardShared { ShardDB& db; std::array, 2> ips; std::array, 2> ports; std::array socks; std::atomic blockServicesWritten; std::array timings; std::array errors; SPSC logEntriesQueue; std::mutex logEntriesQueuePushLock; // almost always uncontended std::atomic logEntriesQueueSize; std::array, 2> receivedRequests; // how many requests we got at once from each socket std::atomic pulledWriteRequests; // how many requests we got from write queue ShardShared() = delete; ShardShared(ShardDB& db_): db(db_), ips{0, 0}, ports{0, 0}, blockServicesWritten(false), logEntriesQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) { for (ShardMessageKind kind : allShardMessageKind) { timings[(int)kind] = Timings::Standard(); } for (auto& x: receivedRequests) { x = 0; } } }; static bool bigRequest(ShardMessageKind kind) { return unlikely( kind == ShardMessageKind::ADD_SPAN_INITIATE || kind == ShardMessageKind::ADD_SPAN_CERTIFY ); } static bool bigResponse(ShardMessageKind kind) { return unlikely( kind == ShardMessageKind::READ_DIR || kind == ShardMessageKind::ADD_SPAN_INITIATE || kind == ShardMessageKind::FILE_SPANS || kind == ShardMessageKind::VISIT_DIRECTORIES || kind == ShardMessageKind::VISIT_FILES || kind == ShardMessageKind::VISIT_TRANSIENT_FILES || kind == ShardMessageKind::BLOCK_SERVICE_FILES || kind == ShardMessageKind::FULL_READ_DIR ); } static void packResponse( Env& env, ShardShared& shared, std::vector& sendBuf, std::vector& sendHdrs, std::vector& sendVecs, uint64_t requestId, ShardMessageKind kind, Duration elapsed, bool dropArtificially, struct sockaddr_in* clientAddr, int sockIx, EggsError err, const ShardRespContainer& resp ) { // pack into sendBuf size_t sendBufBegin = sendBuf.size(); sendBuf.resize(sendBufBegin + MAX_UDP_MTU); BincodeBuf respBbuf(&sendBuf[sendBufBegin], MAX_UDP_MTU); if (err == NO_ERROR) { LOG_DEBUG(env, "successfully processed request %s with kind %s in %s", requestId, kind, elapsed); if (bigResponse(kind)) { if (unlikely(env._shouldLog(LogLevel::LOG_TRACE))) { LOG_TRACE(env, "resp body: %s", resp); } else { LOG_DEBUG(env, "resp body: "); } } else { LOG_DEBUG(env, "resp body: %s", resp); } ShardResponseHeader(requestId, kind).pack(respBbuf); resp.pack(respBbuf); } else { LOG_DEBUG(env, "request %s failed with error %s in %s", kind, err, elapsed); ShardResponseHeader(requestId, ShardMessageKind::ERROR).pack(respBbuf); respBbuf.packScalar((uint16_t)err); } sendBuf.resize(sendBufBegin + respBbuf.len()); shared.timings[(int)kind].add(elapsed); shared.errors[(int)kind].add(err); if (unlikely(dropArtificially)) { LOG_DEBUG(env, "artificially dropping response %s", requestId); sendBuf.resize(sendBufBegin); return; } LOG_DEBUG(env, "will send response for req id %s kind %s to %s", requestId, kind, *clientAddr); // Prepare sendmmsg stuff. The vectors might be resized by the // time we get to sending this, so store references when we must // -- we'll fix up the actual values later. auto& hdr = sendHdrs.emplace_back(); hdr.msg_hdr = { .msg_name = clientAddr, .msg_namelen = sizeof(*clientAddr), .msg_iovlen = 1, }; hdr.msg_len = respBbuf.len(); auto& vec = sendVecs.emplace_back(); vec.iov_base = (void*)sendBufBegin; vec.iov_len = respBbuf.len(); } struct ShardServer : Loop { private: // init data ShardShared& _shared; bool _initialized; ShardId _shid; std::array _ipPorts; uint64_t _packetDropRand; uint64_t _incomingPacketDropProbability; // probability * 10,000 uint64_t _outgoingPacketDropProbability; // probability * 10,000 // run data AES128Key _expandedCDCKey; // recvmmsg data (one per socket, since we receive from both and send from both // using some of the header data) std::array, 2> _recvBuf; std::array, 2> _recvHdrs; std::array, 2> _recvAddrs; std::array, 2> _recvVecs; // what we parse into ShardReqContainer _reqContainer; ShardRespContainer _respContainer; // log entries buffers std::vector _logEntries; // sendmmsg data std::vector _sendBuf; std::array, 2> _sendHdrs; // one per socket std::array, 2> _sendVecs; public: ShardServer(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared) : Loop(logger, xmon, "server"), _shared(shared), _initialized(false), _shid(shid), _ipPorts(options.ipPorts), _packetDropRand(eggsNow().ns), _incomingPacketDropProbability(0), _outgoingPacketDropProbability(0) { auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) { if (prob != 0.0) { LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what); iprob = prob * 10'000.0; ALWAYS_ASSERT(iprob > 0 && iprob < 10'000); } }; convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability); convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); } void init() { LOG_INFO(_env, "initializing server sockets"); expandKey(CDCKey, _expandedCDCKey); memset(_shared.socks.data(), 0, _shared.socks.size()*sizeof(_shared.socks[0])); for (int i = 0; i < _ipPorts.size(); i++) { const auto& ipPort = _ipPorts[i]; if (ipPort.ip == 0) { break; } int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (sock < 0) { throw SYSCALL_EXCEPTION("cannot create socket"); } struct sockaddr_in serverAddr; serverAddr.sin_family = AF_INET; uint32_t ipn = htonl(ipPort.ip); memcpy(&serverAddr.sin_addr.s_addr, &ipn, sizeof(ipn)); serverAddr.sin_port = htons(ipPort.port); if (bind(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0) { char ip[INET_ADDRSTRLEN]; throw SYSCALL_EXCEPTION("cannot bind socket to addr %s:%s", inet_ntop(AF_INET, &serverAddr.sin_addr, ip, INET_ADDRSTRLEN), ipPort.port); } { socklen_t addrLen = sizeof(serverAddr); if (getsockname(sock, (struct sockaddr*)&serverAddr, &addrLen) < 0) { throw SYSCALL_EXCEPTION("getsockname"); } } // stats are ~50byte, and are the most common request, say 100byte // per request on average, 1MiB buffer should be enough for 10k requests // or so in each of the two queues. { int bufSize = 1<<20; if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void*)&bufSize, sizeof(bufSize)) < 0) { throw SYSCALL_EXCEPTION("setsockopt"); } } // store addresses/fds _shared.ips[i].store(ntohl(serverAddr.sin_addr.s_addr), std::memory_order_release); _shared.ports[i].store(ntohs(serverAddr.sin_port), std::memory_order_release); _shared.socks[i].fd = sock; _shared.socks[i].events = POLL_IN; LOG_INFO(_env, "Bound shard %s to %s", _shid, serverAddr); _recvBuf[i].resize(DEFAULT_UDP_MTU*MAX_RECV_MSGS); _recvHdrs[i].resize(MAX_RECV_MSGS); memset(_recvHdrs[i].data(), 0, sizeof(_recvHdrs[i][0])*MAX_RECV_MSGS); _recvAddrs[i].resize(MAX_RECV_MSGS); _recvVecs[i].resize(MAX_RECV_MSGS); for (int j = 0; j < _recvVecs[i].size(); j++) { _recvVecs[i][j].iov_base = &_recvBuf[i][j*DEFAULT_UDP_MTU]; _recvVecs[i][j].iov_len = DEFAULT_UDP_MTU; _recvHdrs[i][j].msg_hdr.msg_iov = &_recvVecs[i][j]; _recvHdrs[i][j].msg_hdr.msg_iovlen = 1; _recvHdrs[i][j].msg_hdr.msg_namelen = sizeof(_recvAddrs[i][j]); _recvHdrs[i][j].msg_hdr.msg_name = &_recvAddrs[i][j]; } } } virtual void step() override { if (unlikely(!_initialized)) { init(); _initialized = true; } if (unlikely(!_shared.blockServicesWritten)) { (100_ms).sleepRetry(); return; } _logEntries.clear(); _sendBuf.clear(); for (int i = 0; i < 2; i++) { _sendHdrs[i].clear(); _sendVecs[i].clear(); } if (unlikely(poll(_shared.socks.data(), 1 + (_shared.socks[1].fd != 0), -1) < 0)) { throw SYSCALL_EXCEPTION("poll"); } for (int sockIx = 0; sockIx < _shared.socks.size(); sockIx++) { const auto& sock = _shared.socks[sockIx]; if (!(sock.revents & POLLIN)) { continue; } int msgs = recvmmsg(_shared.socks[sockIx].fd, &_recvHdrs[sockIx][0], _recvHdrs[sockIx].size(), MSG_DONTWAIT, nullptr); if (unlikely(msgs < 0)) { // we know we have data from poll, we won't get EAGAIN throw SYSCALL_EXCEPTION("recvmmsgs"); } LOG_DEBUG(_env, "received %s messages from socket %s", msgs, sockIx); if (msgs > 0) { _shared.receivedRequests[sockIx] = _shared.receivedRequests[sockIx]*0.95 + ((double)msgs)*0.05; } for (int msgIx = 0; msgIx < msgs; msgIx++) { auto& hdr = _recvHdrs[sockIx][msgIx]; auto clientAddr = (struct sockaddr_in *)hdr.msg_hdr.msg_name; LOG_DEBUG(_env, "received message from %s", *clientAddr); BincodeBuf reqBbuf((char*)hdr.msg_hdr.msg_iov->iov_base, hdr.msg_len); // First, try to parse the header ShardRequestHeader reqHeader; try { reqHeader.unpack(reqBbuf); } catch (const BincodeException& err) { LOG_ERROR(_env, "Could not parse: %s", err.what()); RAISE_ALERT(_env, "could not parse request header from %s, dropping it.", *clientAddr); continue; } if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) { LOG_DEBUG(_env, "artificially dropping request %s", reqHeader.requestId); continue; } auto t0 = eggsNow(); LOG_DEBUG(_env, "received request id %s, kind %s, from %s", reqHeader.requestId, reqHeader.kind, *clientAddr); // If this will be filled in with an actual code, it means that we couldn't process // the request. EggsError err = NO_ERROR; // Now, try to parse the body try { _reqContainer.unpack(reqBbuf, reqHeader.kind); if (bigRequest(reqHeader.kind)) { if (unlikely(_env._shouldLog(LogLevel::LOG_TRACE))) { LOG_TRACE(_env, "parsed request: %s", _reqContainer); } else { LOG_DEBUG(_env, "parsed request: "); } } else { LOG_DEBUG(_env, "parsed request: %s", _reqContainer); } } catch (const BincodeException& exc) { LOG_ERROR(_env, "Could not parse: %s", exc.what()); RAISE_ALERT(_env, "could not parse request of kind %s from %s, will reply with error.", reqHeader.kind, *clientAddr); err = EggsError::MALFORMED_REQUEST; } // authenticate, if necessary if (isPrivilegedRequestKind(reqHeader.kind)) { auto expectedMac = cbcmac(_expandedCDCKey, reqBbuf.data, reqBbuf.cursor - reqBbuf.data); BincodeFixedBytes<8> receivedMac; reqBbuf.unpackFixedBytes<8>(receivedMac); if (expectedMac != receivedMac.data) { err = EggsError::NOT_AUTHORISED; } } // Make sure nothing is left if (unlikely(err == NO_ERROR && reqBbuf.remaining() != 0)) { RAISE_ALERT(_env, "%s bytes remaining after parsing request of kind %s from %s, will reply with error", reqBbuf.remaining(), reqHeader.kind, *clientAddr); err = EggsError::MALFORMED_REQUEST; } // At this point, if it's a read request, we can process it, // if it's a write request we prepare the log entry and // send it off. if (likely(err == NO_ERROR)) { if (readOnlyShardReq(_reqContainer.kind())) { err = _shared.db.read(_reqContainer, _respContainer); } else { auto& entry = _logEntries.emplace_back(); entry.sockIx = sockIx; entry.clientAddr = *clientAddr; entry.receivedAt = t0; entry.requestKind = reqHeader.kind; entry.requestId = reqHeader.requestId; err = _shared.db.prepareLogEntry(_reqContainer, entry.logEntry); if (likely(err == NO_ERROR)) { continue; // we're done here, move along } else { _logEntries.pop_back(); // back out the log entry } } } Duration elapsed = eggsNow() - t0; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; packResponse(_env, _shared, _sendBuf, _sendHdrs[sockIx], _sendVecs[sockIx], reqHeader.requestId, reqHeader.kind, elapsed, dropArtificially, clientAddr, sockIx, err, _respContainer); } } // write out write requests to queue { size_t numLogEntries = _logEntries.size(); if (numLogEntries > 0) { LOG_DEBUG(_env, "pushing %s log entries to writer", numLogEntries); uint32_t pushed; { std::lock_guard guard(_shared.logEntriesQueuePushLock); pushed = _shared.logEntriesQueue.push(_logEntries); } _shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.logEntriesQueue.size()*0.05; if (pushed < numLogEntries) { LOG_INFO(_env, "tried to push %s elements to write queue, but pushed %s instead", numLogEntries, pushed); } } } // write out read responses to UDP for (int i = 0; i < 2; i++) { if (_sendHdrs[i].size() == 0) { continue; } LOG_DEBUG(_env, "sending %s read responses to sock %s", _sendHdrs[i].size(), i); for (int j = 0; j < _sendHdrs[i].size(); j++) { auto& vec = _sendVecs[i][j]; vec.iov_base = &_sendBuf[(size_t)vec.iov_base]; auto& hdr = _sendHdrs[i][j]; hdr.msg_hdr.msg_iov = &vec; } int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0); if (unlikely(ret < 0)) { // we get EPERM when nf drops packets if (errno == EPERM) { LOG_INFO(_env, "we got EPERM when trying to send %s messages, will drop them", _sendHdrs[i].size()); } else { throw SYSCALL_EXCEPTION("sendto"); } } else if (unlikely(ret < _sendHdrs[i].size())) { LOG_INFO(_env, "dropping %s out of %s requests since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size()); } } } }; struct ShardWriter : Loop { private: ShardShared& _shared; uint64_t _currentLogIndex; ShardRespContainer _respContainer; std::vector _logEntries; // sendmmsg data (one per socket) std::vector _sendBuf; std::array, 2> _sendHdrs; // one per socket std::array, 2> _sendVecs; uint64_t _packetDropRand; uint64_t _incomingPacketDropProbability; // probability * 10,000 uint64_t _outgoingPacketDropProbability; // probability * 10,000 public: ShardWriter(Logger& logger, std::shared_ptr& xmon, const ShardOptions& options, ShardShared& shared) : Loop(logger, xmon, "writer"), _shared(shared), _packetDropRand(eggsNow().ns), _incomingPacketDropProbability(0), _outgoingPacketDropProbability(0) { _currentLogIndex = _shared.db.lastAppliedLogEntry(); auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) { if (prob != 0.0) { LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what); iprob = prob * 10'000.0; ALWAYS_ASSERT(iprob > 0 && iprob < 10'000); } }; convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability); convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); _logEntries.reserve(MAX_WRITES_AT_ONCE); } virtual void step() override { _logEntries.clear(); _sendBuf.clear(); for (int i = 0; i < _sendHdrs.size(); i++) { _sendHdrs[i].clear(); _sendVecs[i].clear(); } uint32_t pulled = _shared.logEntriesQueue.pull(_logEntries, MAX_WRITES_AT_ONCE); if (pulled > 0) { LOG_DEBUG(_env, "pulled %s requests from write queue", pulled); _shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05; } for (auto& logEntry : _logEntries) { if (likely(logEntry.requestId)) { LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr); } else { LOG_DEBUG(_env, "applying request-less log entry"); } _currentLogIndex++; EggsError err = _shared.db.applyLogEntry(logEntry.requestKind, _currentLogIndex, logEntry.logEntry, _respContainer); if (likely(logEntry.requestId)) { Duration elapsed = eggsNow() - logEntry.receivedAt; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; packResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer); } else if (unlikely(err != NO_ERROR)) { RAISE_ALERT(_env, "could not apply request-less log entry: %s", err); } // We can do this before we flush: the other writes will see this // write already, which is what matters. if (logEntry.logEntry.body.kind() == ShardLogEntryKind::UPDATE_BLOCK_SERVICES) { LOG_INFO(_env, "applied block service update"); _shared.blockServicesWritten = true; } } if (pulled > 0) { LOG_DEBUG(_env, "flushing and sending %s writes", pulled); _shared.db.flush(true); // important to send all of them after the flush! otherwise it's not durable yet for (int i = 0; i < _sendHdrs.size(); i++) { if (_sendHdrs[i].size() == 0) { continue; } LOG_DEBUG(_env, "sending %s write responses to socket %s", _sendHdrs[i].size(), i); for (int j = 0; j < _sendHdrs[i].size(); j++) { auto& vec = _sendVecs[i][j]; vec.iov_base = &_sendBuf[(size_t)vec.iov_base]; auto& hdr = _sendHdrs[i][j]; hdr.msg_hdr.msg_iov = &vec; } int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0); if (unlikely(ret < 0)) { // we get this when nf drops packets if (errno != EPERM) { throw SYSCALL_EXCEPTION("sendto"); } else { LOG_INFO(_env, "dropping %s responses because of EPERM", _sendHdrs[i].size()); } } else if (unlikely(ret < _sendHdrs[i].size())) { LOG_INFO(_env, "dropping %s out of %s requests since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size()); } } } } }; struct ShardRegisterer : PeriodicLoop { private: ShardShared& _shared; Stopper _stopper; ShardId _shid; std::string _shuckleHost; uint16_t _shucklePort; bool _hasSecondIp; XmonNCAlert _alert; public: ShardRegisterer(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared) : PeriodicLoop(logger, xmon, "registerer", {1_sec, 1_mins}), _shared(shared), _shid(shid), _shuckleHost(options.shuckleHost), _shucklePort(options.shucklePort), _hasSecondIp(options.ipPorts[1].port != 0) {} void init() { _env.updateAlert(_alert, "Waiting to register ourselves for the first time"); } virtual bool periodicStep() { uint16_t port1 = _shared.ports[0].load(); uint16_t port2 = _shared.ports[1].load(); // Avoid registering with only one port, so that clients can just wait on // the first port being ready and they always have both. if (port1 == 0 || (_hasSecondIp && port2 == 0)) { // shard server isn't up yet return false; } uint32_t ip1 = _shared.ips[0].load(); uint32_t ip2 = _shared.ips[1].load(); LOG_INFO(_env, "Registering ourselves (shard %s, %s:%s, %s:%s) with shuckle", _shid, in_addr{htonl(ip1)}, port1, in_addr{htonl(ip2)}, port2); std::string err = registerShard(_shuckleHost, _shucklePort, 10_sec, _shid, ip1, port1, ip2, port2); if (!err.empty()) { _env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", err); return false; } _env.clearAlert(_alert); return true; } }; struct ShardBlockServiceUpdater : PeriodicLoop { private: ShardShared& _shared; ShardId _shid; std::string _shuckleHost; uint16_t _shucklePort; XmonNCAlert _alert; ShardRespContainer _respContainer; std::vector _logEntries; public: ShardBlockServiceUpdater(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared): PeriodicLoop(logger, xmon, "bs_updater", {1_sec, 1_mins}), _shared(shared), _shid(shid), _shuckleHost(options.shuckleHost), _shucklePort(options.shucklePort) { _env.updateAlert(_alert, "Waiting to fetch block services for the first time"); } virtual bool periodicStep() override { LOG_INFO(_env, "about to fetch block services from %s:%s", _shuckleHost, _shucklePort); _logEntries.clear(); auto& logEntry = _logEntries.emplace_back(); logEntry.logEntry.time = eggsNow(); auto& blockServicesEntry = logEntry.logEntry.body.setUpdateBlockServices(); std::string err = fetchBlockServices(_shuckleHost, _shucklePort, 10_sec, _shid, blockServicesEntry); if (!err.empty()) { _env.updateAlert(_alert, "could not reach shuckle: %s", err); return false; } if (blockServicesEntry.blockServices.els.empty()) { _env.updateAlert(_alert, "got no block services"); return false; } { // The scheme below is a very cheap way to always pick different failure domains // for our block services: we just set the current block services to be all of // different failure domains, sharded by storage type. // // It does require having at least 14 failure domains (to do RS(10,4)), which is // easy right now since we have ~100 failure domains in iceland. // storage class -> failure domain -> block service ids std::unordered_map>> blockServicesByFailureDomain; for (const auto& blockService: logEntry.logEntry.body.getUpdateBlockServices().blockServices.els) { if (blockService.flags & BLOCK_SERVICE_DONT_WRITE) { continue; } __int128 failureDomain; static_assert(sizeof(failureDomain) == sizeof(blockService.failureDomain)); memcpy(&failureDomain, &blockService.failureDomain.name.data[0], sizeof(failureDomain)); blockServicesByFailureDomain[blockService.storageClass][failureDomain].emplace_back(&blockService); } uint64_t rand = wyhash64_rand(); for (const auto& [storageClass, byFailureDomain]: blockServicesByFailureDomain) { for (const auto& [failureDomain, blockServices]: byFailureDomain) { blockServicesEntry.currentBlockServices.els.emplace_back(blockServices[wyhash64(&rand)%blockServices.size()]->id); } } } for (;;) { uint32_t pushed; { std::lock_guard guard(_shared.logEntriesQueuePushLock); pushed = _shared.logEntriesQueue.push(_logEntries); } if (unlikely(pushed == 0)) { _env.updateAlert(_alert, "could not push update block services log entry to queue, will try again"); (100_ms).sleepRetry(); } else { break; } } LOG_DEBUG(_env, "pushed block block service update"); _env.clearAlert(_alert); return true; } }; struct ShardStatsInserter : PeriodicLoop { private: ShardShared& _shared; ShardId _shid; std::string _shuckleHost; uint16_t _shucklePort; XmonNCAlert _alert; std::vector _stats; public: ShardStatsInserter(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared): PeriodicLoop(logger, xmon, "stats", {1_mins, 1_hours}), _shared(shared), _shid(shid), _shuckleHost(options.shuckleHost), _shucklePort(options.shucklePort) {} virtual bool periodicStep() override { for (ShardMessageKind kind : allShardMessageKind) { std::ostringstream prefix; prefix << "shard." << std::setw(3) << std::setfill('0') << _shid << "." << kind; _shared.timings[(int)kind].toStats(prefix.str(), _stats); _shared.errors[(int)kind].toStats(prefix.str(), _stats); } LOG_INFO(_env, "inserting stats"); std::string err = insertStats(_shuckleHost, _shucklePort, 10_sec, _stats); _stats.clear(); if (err.empty()) { _env.clearAlert(_alert); for (ShardMessageKind kind : allShardMessageKind) { _shared.timings[(int)kind].reset(); _shared.errors[(int)kind].reset(); } return true; } else { _env.updateAlert(_alert, "Could not insert stats: %s", err); return false; } } // TODO restore this when we have the functionality to do so // virtual void finish() override { // LOG_INFO(_env, "insert stats one last time"); // periodicStep(); // } }; struct ShardMetricsInserter : PeriodicLoop { private: ShardShared& _shared; ShardId _shid; XmonNCAlert _alert; MetricsBuilder _metricsBuilder; std::unordered_map _rocksDBStats; public: ShardMetricsInserter(Logger& logger, std::shared_ptr& xmon, ShardId shid, ShardShared& shared): PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}), _shared(shared), _shid(shid) {} virtual bool periodicStep() { _shared.db.dumpRocksDBStatistics(); auto now = eggsNow(); for (ShardMessageKind kind : allShardMessageKind) { const ErrorCount& errs = _shared.errors[(int)kind]; for (int i = 0; i < errs.count.size(); i++) { uint64_t count = errs.count[i].load(); if (count == 0) { continue; } _metricsBuilder.measurement("eggsfs_shard_requests"); _metricsBuilder.tag("shard", _shid); _metricsBuilder.tag("kind", kind); _metricsBuilder.tag("write", !readOnlyShardReq(kind)); if (i == 0) { _metricsBuilder.tag("error", "NO_ERROR"); } else { _metricsBuilder.tag("error", (EggsError)i); } _metricsBuilder.fieldU64("count", count); _metricsBuilder.timestamp(now); } } { _metricsBuilder.measurement("eggsfs_shard_write_queue"); _metricsBuilder.tag("shard", _shid); _metricsBuilder.fieldFloat("size", _shared.logEntriesQueueSize); _metricsBuilder.timestamp(now); } for (int i = 0; i < _shared.receivedRequests.size(); i++) { _metricsBuilder.measurement("eggsfs_shard_received_requests"); _metricsBuilder.tag("shard", _shid); _metricsBuilder.tag("socket", i); _metricsBuilder.fieldFloat("count", _shared.receivedRequests[i]); _metricsBuilder.timestamp(now); } { _metricsBuilder.measurement("eggsfs_shard_pulled_write_requests"); _metricsBuilder.tag("shard", _shid); _metricsBuilder.fieldFloat("count", _shared.pulledWriteRequests); _metricsBuilder.timestamp(now); } { _rocksDBStats.clear(); _shared.db.rocksDBMetrics(_rocksDBStats); for (const auto& [name, value]: _rocksDBStats) { _metricsBuilder.measurement("eggsfs_shard_rocksdb"); _metricsBuilder.tag("shard", _shid); _metricsBuilder.fieldU64(name, value); _metricsBuilder.timestamp(now); } } std::string err = sendMetrics(10_sec, _metricsBuilder.payload()); _metricsBuilder.reset(); if (err.empty()) { LOG_INFO(_env, "Sent metrics to influxdb"); _env.clearAlert(_alert); return true; } else { _env.updateAlert(_alert, "Could not insert metrics: %s", err); return false; } } }; void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& options) { int logOutFd = STDOUT_FILENO; if (!options.logFile.empty()) { logOutFd = open(options.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644); if (logOutFd < 0) { throw SYSCALL_EXCEPTION("open"); } } Logger logger(options.logLevel, logOutFd, options.syslog, true); std::shared_ptr xmon; if (options.xmon) { xmon = std::make_shared(); } Env env(logger, xmon, "startup"); { LOG_INFO(env, "Running shard %s with options:", shid); LOG_INFO(env, " level = %s", options.logLevel); LOG_INFO(env, " logFile = '%s'", options.logFile); LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost); LOG_INFO(env, " shucklePort = %s", options.shucklePort); for (int i = 0; i < 2; i++) { LOG_INFO(env, " port%s = %s", i+1, options.ipPorts[0].port); { char ip[INET_ADDRSTRLEN]; uint32_t ipN = options.ipPorts[i].ip; LOG_INFO(env, " ownIp%s = %s", i+1, inet_ntop(AF_INET, &ipN, ip, INET_ADDRSTRLEN)); } } LOG_INFO(env, " simulateIncomingPacketDrop = %s", options.simulateIncomingPacketDrop); LOG_INFO(env, " simulateOutgoingPacketDrop = %s", options.simulateOutgoingPacketDrop); LOG_INFO(env, " syslog = %s", (int)options.syslog); } std::vector threads; if (xmon) { threads.emplace_back([&logger, xmon, shid, &options]() mutable { XmonConfig config; { std::ostringstream ss; ss << std::setw(3) << std::setfill('0') << shid; config.appInstance = "eggsshard" + ss.str(); } config.prod = options.xmonProd; config.appType = "restech_eggsfs.critical"; Xmon(logger, xmon, config).run(); }); } XmonNCAlert dbInitAlert; env.updateAlert(dbInitAlert, "initializing database"); ShardDB db(logger, xmon, shid, options.transientDeadlineInterval, dbDir); env.clearAlert(dbInitAlert); ShardShared shared(db); threads.emplace_back([&logger, xmon, &options, &shared]() mutable { ShardWriter(logger, xmon, options, shared).run(); }); threads.emplace_back([&logger, xmon, shid, &options, &shared]() mutable { ShardRegisterer(logger, xmon, shid, options, shared).run(); }); threads.emplace_back([&logger, xmon, shid, &options, &shared]() mutable { ShardBlockServiceUpdater(logger, xmon, shid, options, shared).run(); }); threads.emplace_back([&logger, xmon, shid, &options, &shared]() mutable { ShardStatsInserter(logger, xmon, shid, options, shared).run(); }); if (options.metrics) { threads.emplace_back([&logger, xmon, shid, &shared]() mutable { ShardMetricsInserter(logger, xmon, shid, shared).run(); }); } ShardServer(logger, xmon, shid, options, shared).run(); }