#include "Shard.hpp" #include #include #include #include #include #include #include "Assert.hpp" #include "Bincode.hpp" #include "BlockServicesCacheDB.hpp" #include "CDCKey.hpp" #include "Common.hpp" #include "Crypto.hpp" #include "Env.hpp" #include "ErrorCount.hpp" #include "Exception.hpp" #include "LogsDB.hpp" #include "Loop.hpp" #include "Metrics.hpp" #include "MsgsGen.hpp" #include "MultiplexedChannel.hpp" #include "PeriodicLoop.hpp" #include "Protocol.hpp" #include "ShardDB.hpp" #include "ShardKey.hpp" #include "SharedRocksDB.hpp" #include "Shuckle.hpp" #include "SPSC.hpp" #include "Time.hpp" #include "Timings.hpp" #include "UDPSocketPair.hpp" #include "wyhash.h" #include "Xmon.hpp" struct QueuedShardLogEntry { ShardLogEntry logEntry; // if requestId == 0, the rest is all garbage -- we don't need it. // this is for log entries that are not generated by users (e.g. // block service updates). uint64_t requestId; EggsTime receivedAt; IpPort clientAddr; int sockIx; // which sock to use to reply ShardMessageKind requestKind; bool checkPointed; }; struct SnapshotRequest { uint64_t requestId; EggsTime receivedAt; uint64_t snapshotId; IpPort clientAddr; int sockIx; // which sock to use to reply }; // TODO make options const int LOG_ENTRIES_QUEUE_SIZE = 8192; // a few megabytes, should be quite a bit bigger than the below const int MAX_RECV_MSGS = 100; enum class WriterQueueEntryKind :uint8_t { LOGSDB_REQUEST = 1, LOGSDB_RESPONSE = 2, SHARD_LOG_ENTRY = 3, SNAPSHOT_REQUEST = 4, }; std::ostream& operator<<(std::ostream& out, WriterQueueEntryKind kind) { switch (kind) { case WriterQueueEntryKind::LOGSDB_REQUEST: out << "LOGSDB_REQUEST"; break; case WriterQueueEntryKind::LOGSDB_RESPONSE: out << "LOGSDB_RESPONSE"; break; case WriterQueueEntryKind::SHARD_LOG_ENTRY: out << "SHARD_LOG_ENTRY"; break; case WriterQueueEntryKind::SNAPSHOT_REQUEST: out << "SNAPSHOT_REQUEST"; break; default: out << "Unknown WriterQueueEntryKind(" << (uint8_t)kind << ")"; break; } return out; } class ShardWriterRequest { public: ShardWriterRequest() { clear(); } ShardWriterRequest(ShardWriterRequest&& other) { *this = std::move(other); } ShardWriterRequest& operator=(ShardWriterRequest&& other) { _kind = other._kind; _data = std::move(other._data); other.clear(); return *this; } void clear() { _kind = (WriterQueueEntryKind)0; } WriterQueueEntryKind kind() const { return _kind; } LogsDBRequest& setLogsDBRequest() { _kind = WriterQueueEntryKind::LOGSDB_REQUEST; auto& x = _data.emplace<0>(); return x; } const LogsDBRequest& getLogsDBRequest() const { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST); return std::get<0>(_data); } LogsDBRequest&& moveLogsDBRequest() { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST); clear(); return std::move(std::get<0>(_data)); } LogsDBResponse& setLogsDBResponse() { _kind = WriterQueueEntryKind::LOGSDB_RESPONSE; auto& x = _data.emplace<1>(); return x; } const LogsDBResponse& getLogsDBResponse() const { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE); return std::get<1>(_data); } LogsDBResponse&& moveLogsDBResponse() { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE); clear(); return std::move(std::get<1>(_data)); } QueuedShardLogEntry& setQueuedShardLogEntry() { _kind = WriterQueueEntryKind::SHARD_LOG_ENTRY; auto& x = _data.emplace<2>(); return x; } const QueuedShardLogEntry& getQueuedShardLogEntry() const { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY); return std::get<2>(_data); } QueuedShardLogEntry&& moveQueuedShardLogEntry() { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY); clear(); return std::move(std::get<2>(_data)); } SnapshotRequest& setSnapshotRequest() { _kind = WriterQueueEntryKind::SNAPSHOT_REQUEST; auto& x = _data.emplace<3>(); return x; } const SnapshotRequest& getSnapshotRequest() const { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SNAPSHOT_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::SNAPSHOT_REQUEST); return std::get<3>(_data); } SnapshotRequest&& moveSnapshotRequest() { ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SNAPSHOT_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::SNAPSHOT_REQUEST); clear(); return std::move(std::get<3>(_data)); } private: WriterQueueEntryKind _kind; std::variant _data; }; struct ShardShared { const ShardOptions& options; SharedRocksDB& sharedDB; BlockServicesCacheDB& blockServicesCache; ShardDB& shardDB; LogsDB& logsDB; std::array socks; // in an array to play with UDPReceiver<> std::array timings; std::array errors; SPSC writerRequestsQueue; std::atomic logEntriesQueueSize; std::array, 2> receivedRequests; // how many requests we got at once from each socket std::atomic pulledWriteRequests; // how many requests we got from write queue std::shared_ptr> replicas; std::atomic isLeader; ShardShared() = delete; ShardShared(const ShardOptions& options_, SharedRocksDB& sharedDB_, BlockServicesCacheDB& blockServicesCache_, ShardDB& shardDB_, LogsDB& logsDB_, UDPSocketPair&& sock) : options(options_), sharedDB(sharedDB_), blockServicesCache(blockServicesCache_), shardDB(shardDB_), logsDB(logsDB_), socks({std::move(sock)}), writerRequestsQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) { for (ShardMessageKind kind : allShardMessageKind) { timings[(int)kind] = Timings::Standard(); } for (auto& x: receivedRequests) { x = 0; } } const UDPSocketPair& sock() const { return socks[0]; } }; static bool bigRequest(ShardMessageKind kind) { return unlikely( kind == ShardMessageKind::ADD_SPAN_INITIATE || kind == ShardMessageKind::ADD_SPAN_CERTIFY ); } static bool bigResponse(ShardMessageKind kind) { return unlikely( kind == ShardMessageKind::READ_DIR || kind == ShardMessageKind::ADD_SPAN_INITIATE || kind == ShardMessageKind::FILE_SPANS || kind == ShardMessageKind::VISIT_DIRECTORIES || kind == ShardMessageKind::VISIT_FILES || kind == ShardMessageKind::VISIT_TRANSIENT_FILES || kind == ShardMessageKind::BLOCK_SERVICE_FILES || kind == ShardMessageKind::FULL_READ_DIR ); } static void packShardResponse( Env& env, ShardShared& shared, const AddrsInfo& srcAddr, UDPSender& sender, Duration elapsed, bool dropArtificially, const IpPort& clientAddr, uint8_t sockIx, ShardMessageKind reqKind, const ShardRespMsg& msg ) { auto respKind = msg.body.kind(); shared.timings[(int)reqKind].add(elapsed); shared.errors[(int)reqKind].add( respKind != ShardMessageKind::ERROR ? EggsError::NO_ERROR : msg.body.getError()); if (unlikely(dropArtificially)) { LOG_DEBUG(env, "artificially dropping response %s", msg.id); return; } ALWAYS_ASSERT(clientAddr.port != 0); if (respKind != ShardMessageKind::ERROR) { LOG_DEBUG(env, "successfully processed request %s with kind %s in %s", msg.id, reqKind, elapsed); if (bigResponse(respKind)) { if (unlikely(env._shouldLog(LogLevel::LOG_TRACE))) { LOG_TRACE(env, "resp: %s", msg); } else { LOG_DEBUG(env, "resp: "); } } else { LOG_DEBUG(env, "resp: %s", msg); } } else { LOG_DEBUG(env, "request %s failed with error %s in %s", reqKind, msg.body.getError(), elapsed); } sender.prepareOutgoingMessage(env, srcAddr, sockIx, clientAddr, [&msg](BincodeBuf& buf) { msg.pack(buf); }); LOG_DEBUG(env, "will send response for req id %s kind %s to %s", msg.id, reqKind, clientAddr); } static void packCheckPointedShardResponse( Env& env, ShardShared& shared, const AddrsInfo& srcAddr, UDPSender& sender, Duration elapsed, bool dropArtificially, const IpPort& clientAddr, uint8_t sockIx, ShardMessageKind reqKind, const ShardCheckPointedRespMsg& msg, const AES128Key& key ) { auto respKind = msg.body.resp.kind(); shared.timings[(int)reqKind].add(elapsed); shared.errors[(int)reqKind].add( respKind != ShardMessageKind::ERROR ? EggsError::NO_ERROR : msg.body.resp.getError()); if (unlikely(dropArtificially)) { LOG_DEBUG(env, "artificially dropping response %s", msg.id); return; } ALWAYS_ASSERT(clientAddr.port != 0); if (respKind != ShardMessageKind::ERROR) { LOG_DEBUG(env, "successfully processed request %s with kind %s in %s", msg.id, reqKind, elapsed); if (bigResponse(respKind)) { if (unlikely(env._shouldLog(LogLevel::LOG_TRACE))) { LOG_TRACE(env, "resp: %s", msg); } else { LOG_DEBUG(env, "resp: "); } } else { LOG_DEBUG(env, "resp: %s", msg); } } else { LOG_DEBUG(env, "request %s failed with error %s in %s", reqKind, msg.body.resp.getError(), elapsed); } sender.prepareOutgoingMessage(env, srcAddr, sockIx, clientAddr, [&msg, &key](BincodeBuf& buf) { msg.pack(buf, key); }); LOG_DEBUG(env, "will send response for req id %s kind %s to %s", msg.id, reqKind, clientAddr); } // a hack while parts of messages in shard protocol are signed and part are unsigned static ShardMessageKind peekKind(const BincodeBuf& buf) { auto tmpBuf = buf; uint32_t version = tmpBuf.unpackScalar(); if (version != SHARD_REQ_PROTOCOL_VERSION) { throw BINCODE_EXCEPTION("bad protocol version %s, expected %s", version, SHARD_REQ_PROTOCOL_VERSION); } tmpBuf.unpackScalar(); return tmpBuf.unpackScalar(); } struct ShardServer : Loop { private: // init data ShardShared& _shared; ShardReplicaId _shrid; uint64_t _packetDropRand; uint64_t _outgoingPacketDropProbability; // probability * 10,000 // run data AES128Key _expandedCDCKey; AES128Key _expandedShardKey; // log entries buffers std::vector _logEntries; std::unique_ptr> _receiver; std::unique_ptr _sender; std::unique_ptr{SHARD_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>> _channel; public: ShardServer(Logger& logger, std::shared_ptr& xmon, ShardShared& shared) : Loop(logger, xmon, "server"), _shared(shared), _shrid(shared.options.shrid), _packetDropRand(eggsNow().ns), _outgoingPacketDropProbability(0) { auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) { if (prob != 0.0) { LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what); iprob = prob * 10'000.0; ALWAYS_ASSERT(iprob > 0 && iprob < 10'000); } }; convertProb("outgoing", _shared.options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); expandKey(ShardKey, _expandedShardKey); expandKey(CDCKey, _expandedCDCKey); _receiver = std::make_unique>(UDPReceiverConfig{.perSockMaxRecvMsg = MAX_RECV_MSGS, .maxMsgSize = MAX_UDP_MTU}); _sender = std::make_unique(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}); _channel = std::make_unique{SHARD_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>>(); } virtual ~ShardServer() = default; private: void _handleLogsDBResponse(UDPMessage& msg) { LOG_DEBUG(_env, "received LogsDBResponse from %s", msg.clientAddr); auto replicaId = _getReplicaId(msg.clientAddr); if (replicaId == LogsDB::REPLICA_COUNT) { LOG_DEBUG(_env, "We can't match this address to replica. Dropping"); return; } auto& resp = _logEntries.emplace_back().setLogsDBResponse(); resp.replicaId = replicaId; try { resp.msg.unpack(msg.buf, _expandedShardKey); } catch (const BincodeException& err) { LOG_ERROR(_env, "Could not parse LogsDBResponse: %s", err.what()); _logEntries.pop_back(); return; } LOG_DEBUG(_env, "Received response %s for requests id %s from replica id %s", resp.msg.body.kind(), resp.msg.id, resp.replicaId); } void _handleLogsDBRequest(UDPMessage& msg) { LOG_DEBUG(_env, "received LogsDBRequest from %s", msg.clientAddr); auto replicaId = _getReplicaId(msg.clientAddr); if (replicaId == LogsDB::REPLICA_COUNT) { LOG_DEBUG(_env, "We can't match this address to replica. Dropping"); return; } auto& req = _logEntries.emplace_back().setLogsDBRequest(); req.replicaId = replicaId; try { req.msg.unpack(msg.buf, _expandedShardKey); } catch (const BincodeException& err) { LOG_ERROR(_env, "Could not parse LogsDBRequest: %s", err.what()); _logEntries.pop_back(); return; } LOG_DEBUG(_env, "Received request %s with requests id %s from replica id %s", req.msg.body.kind(), req.msg.id, req.replicaId); } uint8_t _getReplicaId(const IpPort& clientAddress) { auto replicasPtr = _shared.replicas; if (!replicasPtr) { return LogsDB::REPLICA_COUNT; } for (ReplicaId replicaId = 0; replicaId.u8 < replicasPtr->size(); ++replicaId.u8) { if (replicasPtr->at(replicaId.u8).contains(clientAddress)) { return replicaId.u8; } } return LogsDB::REPLICA_COUNT; } void _handleShardRequest(UDPMessage& msg, bool checkPointed) { LOG_DEBUG(_env, "received message from %s", msg.clientAddr); if (unlikely(!_shared.isLeader.load(std::memory_order_relaxed))) { LOG_DEBUG(_env, "not leader, dropping request %s", msg.clientAddr); return; } ShardReqMsg req; try { if (checkPointed) { ShardCheckPointedReqMsg signedReq; signedReq.unpack(msg.buf, _expandedCDCKey); req.id = signedReq.id; req.body = std::move(signedReq.body); } else if (isPrivilegedRequestKind((uint8_t)peekKind(msg.buf))) { SignedShardReqMsg signedReq; signedReq.unpack(msg.buf, _expandedCDCKey); req.id = signedReq.id; req.body = std::move(signedReq.body); } else { req.unpack(msg.buf); } } catch (const BincodeException& err) { LOG_ERROR(_env, "Could not parse: %s", err.what()); RAISE_ALERT(_env, "could not parse request from %s, dropping it.", msg.clientAddr); return; } auto t0 = eggsNow(); LOG_DEBUG(_env, "received request id %s, kind %s, from %s", req.id, req.body.kind(), msg.clientAddr); if (bigRequest(req.body.kind())) { if (unlikely(_env._shouldLog(LogLevel::LOG_TRACE))) { LOG_TRACE(_env, "parsed request: %s", req); } else { LOG_DEBUG(_env, "parsed request: "); } } else { LOG_DEBUG(_env, "parsed request: %s", req); } // At this point, if it's a read request, we can process it, // if it's a write request we prepare the log entry and // send it off. ShardRespMsg respContainer; ShardCheckPointedRespMsg checkpointedRespContainer; checkpointedRespContainer.id = respContainer.id = req.id; if (readOnlyShardReq(req.body.kind())) { checkpointedRespContainer.body.checkPointIdx = _shared.shardDB.read(req.body, checkPointed ? checkpointedRespContainer.body.resp : respContainer.body); } else if (unlikely(req.body.kind() == ShardMessageKind::SHARD_SNAPSHOT)) { auto& entry = _logEntries.emplace_back().setSnapshotRequest(); entry.sockIx = msg.socketIx; entry.clientAddr = msg.clientAddr; entry.receivedAt = t0; entry.snapshotId = req.body.getShardSnapshot().snapshotId; entry.requestId = req.id; return; } else { auto& entry = _logEntries.emplace_back().setQueuedShardLogEntry(); entry.sockIx = msg.socketIx; entry.clientAddr = msg.clientAddr; entry.receivedAt = t0; entry.requestKind = req.body.kind(); entry.requestId = req.id; entry.checkPointed = checkPointed; auto err = _shared.shardDB.prepareLogEntry(req.body, entry.logEntry); if (likely(err == EggsError::NO_ERROR)) { return; // we're done here, move along } else { (checkPointed ? checkpointedRespContainer.body.resp : respContainer.body).setError() = err; _logEntries.pop_back(); // back out the log entry } } Duration elapsed = eggsNow() - t0; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; if (checkPointed) { packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), checkpointedRespContainer, _expandedCDCKey); } else { packShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), respContainer); } } public: virtual void step() override { if (unlikely(!_shared.blockServicesCache.haveBlockServices())) { (100_ms).sleepRetry(); return; } _logEntries.clear(); if (unlikely(!_channel->receiveMessages(_env, _shared.socks, *_receiver))) { return; } for (auto& msg : _channel->protocolMessages(LOG_RESP_PROTOCOL_VERSION)) { _handleLogsDBResponse(msg); } for (auto& msg : _channel->protocolMessages(LOG_REQ_PROTOCOL_VERSION)) { _handleLogsDBRequest(msg); } std::array shardMsgCount{0, 0}; for (auto& msg : _channel->protocolMessages(SHARD_REQ_PROTOCOL_VERSION)) { _handleShardRequest(msg, false); ++shardMsgCount[msg.socketIx]; } for (auto& msg : _channel->protocolMessages(SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION)) { _handleShardRequest(msg, true); ++shardMsgCount[msg.socketIx]; } for (size_t i = 0; i < _shared.receivedRequests.size(); ++i) { _shared.receivedRequests[i] = _shared.receivedRequests[i]*0.95 + ((double)shardMsgCount[i])*0.05; } // write out write requests to queue { size_t numLogEntries = _logEntries.size(); if (numLogEntries > 0) { LOG_DEBUG(_env, "pushing %s log entries to writer", numLogEntries); uint32_t pushed = _shared.writerRequestsQueue.push(_logEntries); _shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.writerRequestsQueue.size()*0.05; if (pushed < numLogEntries) { LOG_INFO(_env, "tried to push %s elements to write queue, but pushed %s instead", numLogEntries, pushed); } } } // write out read responses to UDP _sender->sendMessages(_env, _shared.sock()); } }; struct ShardWriter : Loop { private: const std::string _basePath; ShardShared& _shared; AES128Key _expandedShardKey; AES128Key _expandedCDCKey; uint64_t _currentLogIndex; ShardRespMsg _respContainer; ShardCheckPointedRespMsg _checkPointedrespContainer; std::vector _requests; const size_t _maxWritesAtOnce; std::vector _logEntries; SnapshotRequest _snapshotRequest; LogsDB& _logsDB; const bool _dontDoReplication; std::vector _logsDBRequests; std::vector _logsDBResponses; std::vector _logsDBOutRequests; std::vector _logsDBOutResponses; std::unordered_map _inFlightEntries; std::vector _outgoingLogEntries; std::shared_ptr> _replicaInfo; UDPSender _sender; uint64_t _packetDropRand; uint64_t _outgoingPacketDropProbability; // probability * 10,000 virtual void sendStop() override { _shared.writerRequestsQueue.close(); } public: ShardWriter(Logger& logger, std::shared_ptr& xmon, ShardShared& shared) : Loop(logger, xmon, "writer"), _basePath(shared.options.dbDir), _shared(shared), _maxWritesAtOnce(LogsDB::IN_FLIGHT_APPEND_WINDOW * 10), _snapshotRequest({0,0,0,{},0}), _logsDB(shared.logsDB), _dontDoReplication(_shared.options.dontDoReplication), _sender(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}), _packetDropRand(eggsNow().ns), _outgoingPacketDropProbability(0) { expandKey(ShardKey, _expandedShardKey); expandKey(CDCKey, _expandedCDCKey); _currentLogIndex = _shared.shardDB.lastAppliedLogEntry(); auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) { if (prob != 0.0) { LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what); iprob = prob * 10'000.0; ALWAYS_ASSERT(iprob > 0 && iprob < 10'000); } }; convertProb("outgoing", _shared.options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); _requests.reserve(_maxWritesAtOnce); // In case of force leader it immediately detects and promotes us to leader _logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses); _shared.isLeader.store(_logsDB.isLeader(), std::memory_order_relaxed); } virtual ~ShardWriter() = default; void logsDBStep() { LOG_DEBUG(_env, "Calling LogsDB processIncomingMessages with %s requests and %s responses", _logsDBRequests.size(), _logsDBResponses.size()); _logsDB.processIncomingMessages(_logsDBRequests,_logsDBResponses); _shared.isLeader.store(_logsDB.isLeader(), std::memory_order_relaxed); bool droppedDueToInFlightWindow = false; // If we are leader write any outstanding entries std::vector logsDBEntries; if (!_logsDB.isLeader()) { //TODO: we could notify clients we are no longer leader if (_logEntries.size() > 0) { LOG_INFO(_env, "Received %s shard write requests but we are not log leader. dropping", _logEntries.size()); _logEntries.clear(); } if (_inFlightEntries.size() > 0) { LOG_INFO(_env, "There are %s in flight shard write requests but we are no longer log leader. dropping", _inFlightEntries.size()); _inFlightEntries.clear(); } } else if (_logEntries.size()) { logsDBEntries.reserve(_logEntries.size()); std::vector data; data.resize(MAX_UDP_MTU); logsDBEntries.reserve(_requests.size()); LogIdx expectedIdx = _currentLogIndex + _inFlightEntries.size(); for (auto& queuedLogEntry : _logEntries) { auto& logsDBEntry = logsDBEntries.emplace_back();; queuedLogEntry.logEntry.idx = ++expectedIdx; BincodeBuf buf((char*)&data[0], MAX_UDP_MTU); queuedLogEntry.logEntry.pack(buf); logsDBEntry.value.assign(buf.data, buf.cursor); } auto err = _logsDB.appendEntries(logsDBEntries); ALWAYS_ASSERT(err == EggsError::NO_ERROR); ALWAYS_ASSERT(_logEntries.size() == logsDBEntries.size()); for (size_t i = 0; i < _logEntries.size(); ++i) { if (logsDBEntries[i].idx == 0) { // if we don't wait for replication the window gets cleared immediately ALWAYS_ASSERT(!_dontDoReplication); droppedDueToInFlightWindow = true; LOG_INFO(_env, "Appended %s out of %s shard write requests. Log in flight windows is full. Dropping other entries", i, _logEntries.size()); break; } ALWAYS_ASSERT(logsDBEntries[i].idx == _logEntries[i].logEntry.idx); _inFlightEntries.emplace(_logEntries[i].logEntry.idx.u64, std::move(_logEntries[i])); } logsDBEntries.clear(); if (_dontDoReplication) { // usually the state machine is moved by responses if we don't expect any we move it manually _logsDBRequests.clear(); _logsDBResponses.clear(); _logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses); } } // Log if not active is not chaty but it's messages are higher priority as they make us progress state under high load. // We want to have priority when sending out _logsDB.getOutgoingMessages(_logsDBOutRequests, _logsDBOutResponses); for (auto& response : _logsDBOutResponses) { packLogsDBResponse(response); } for (auto request : _logsDBOutRequests) { packLogsDBRequest(*request); } _logsDB.readEntries(logsDBEntries); _outgoingLogEntries.reserve(logsDBEntries.size()); if (_dontDoReplication && _logsDB.isLeader()) { ALWAYS_ASSERT(_inFlightEntries.size() == logsDBEntries.size()); } for (auto& logsDBEntry : logsDBEntries) { ++_currentLogIndex; ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx); ALWAYS_ASSERT(logsDBEntry.value.size() > 0); BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size()); ShardLogEntry shardEntry; shardEntry.unpack(buf); ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx); auto it = _inFlightEntries.find(shardEntry.idx.u64); if (_dontDoReplication && _logsDB.isLeader()) { ALWAYS_ASSERT(it != _inFlightEntries.end()); ALWAYS_ASSERT(shardEntry == it->second.logEntry); } _checkPointedrespContainer.body.checkPointIdx = logsDBEntry.idx; auto& resp = (it != _inFlightEntries.end() && it->second.checkPointed) ? _checkPointedrespContainer.body.resp : _respContainer.body; _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp); if (it != _inFlightEntries.end()) { auto& logEntry = _outgoingLogEntries.emplace_back(std::move(it->second)); _inFlightEntries.erase(it); ALWAYS_ASSERT(shardEntry == logEntry.logEntry); if (likely(logEntry.requestId)) { LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr); } else { LOG_DEBUG(_env, "applying request-less log entry"); } if (likely(logEntry.requestId)) { _checkPointedrespContainer.id = _respContainer.id = logEntry.requestId; Duration elapsed = eggsNow() - logEntry.receivedAt; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; if (logEntry.checkPointed) { packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _checkPointedrespContainer, _expandedCDCKey); } else { packShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _respContainer); } } else if (unlikely(resp.kind() == ShardMessageKind::ERROR)) { RAISE_ALERT(_env, "could not apply request-less log entry: %s", resp.getError()); } } } if (unlikely(droppedDueToInFlightWindow)) { // We can't check before as we first need to process and remove entries that have been released ALWAYS_ASSERT(_inFlightEntries.size() == LogsDB::IN_FLIGHT_APPEND_WINDOW); } _shared.shardDB.flush(true); // not needed as we just flushed and apparently it does actually flush again // _logsDB.flush(true); if (unlikely(_snapshotRequest.requestId != 0)) { _respContainer.id = _snapshotRequest.requestId; auto err = _shared.sharedDB.snapshot(_basePath +"/snapshot-" + std::to_string(_snapshotRequest.snapshotId)); Duration elapsed = eggsNow() - _snapshotRequest.receivedAt; if (err == EggsError::NO_ERROR) { _respContainer.body.setShardSnapshot(); } else { _respContainer.body.setError() = err; } packShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, false, _snapshotRequest.clientAddr, _snapshotRequest.sockIx, ShardMessageKind::SHARD_SNAPSHOT, _respContainer); _snapshotRequest.requestId = 0; } _sender.sendMessages(_env, _shared.sock()); } AddrsInfo* addressFromReplicaId(ReplicaId id) { if (!_replicaInfo) { return nullptr; } auto& addr = (*_replicaInfo)[id.u8]; if (addr[0].port == 0) { return nullptr; } return &addr; } void packLogsDBResponse(LogsDBResponse& response) { auto addrInfoPtr = addressFromReplicaId(response.replicaId); if (unlikely(addrInfoPtr == nullptr)) { LOG_DEBUG(_env, "No information for replica id %s. dropping response", response.replicaId); return; } auto& addrInfo = *addrInfoPtr; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; if (unlikely(dropArtificially)) { LOG_DEBUG(_env, "artificially dropping response %s", response.msg.id); return; } _sender.prepareOutgoingMessage( _env, _shared.sock().addr(), addrInfo, [&response,this](BincodeBuf& buf) { response.msg.pack(buf, _expandedShardKey); }); LOG_DEBUG(_env, "will send response for req id %s kind %s to %s", response.msg.id, response.msg.body.kind(), addrInfo); } void packLogsDBRequest(LogsDBRequest& request) { auto addrInfoPtr = addressFromReplicaId(request.replicaId); if (unlikely(addrInfoPtr == nullptr)) { LOG_DEBUG(_env, "No information for replica id %s. dropping request", request.replicaId); return; } auto& addrInfo = *addrInfoPtr; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; if (unlikely(dropArtificially)) { LOG_DEBUG(_env, "artificially dropping request %s", request.msg.id); return; } _sender.prepareOutgoingMessage( _env, _shared.sock().addr(), addrInfo, [&request,this](BincodeBuf& buf) { request.msg.pack(buf, _expandedShardKey); }); LOG_DEBUG(_env, "will send request for req id %s kind %s to %s", request.msg.id, request.msg.body.kind(), addrInfo); } virtual void step() override { _requests.clear(); _logsDBRequests.clear(); _logsDBResponses.clear(); _logsDBOutRequests.clear(); _logsDBOutResponses.clear(); _logEntries.clear(); _outgoingLogEntries.clear(); _replicaInfo = _shared.replicas; uint32_t pulled = _shared.writerRequestsQueue.pull(_requests, _maxWritesAtOnce, _logsDB.getNextTimeout()); auto start = eggsNow(); if (likely(pulled > 0)) { LOG_DEBUG(_env, "pulled %s requests from write queue", pulled); _shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05; } if (unlikely(_shared.writerRequestsQueue.isClosed())) { // queue is closed, stop stop(); return; } for(auto& request : _requests) { switch (request.kind()) { case WriterQueueEntryKind::LOGSDB_REQUEST: _logsDBRequests.emplace_back(request.moveLogsDBRequest()); break; case WriterQueueEntryKind::LOGSDB_RESPONSE: _logsDBResponses.emplace_back(request.moveLogsDBResponse()); break; case WriterQueueEntryKind::SHARD_LOG_ENTRY: _logEntries.emplace_back(request.moveQueuedShardLogEntry()); break; case WriterQueueEntryKind::SNAPSHOT_REQUEST: _snapshotRequest = request.moveSnapshotRequest(); break; } } logsDBStep(); auto loopTime = eggsNow() - start; } }; struct ShardRegisterer : PeriodicLoop { private: ShardShared& _shared; const ShardReplicaId _shrid; const uint8_t _location; const bool _dontDoReplication; const std::string _shuckleHost; const uint16_t _shucklePort; XmonNCAlert _alert; public: ShardRegisterer(Logger& logger, std::shared_ptr& xmon, ShardShared& shared) : PeriodicLoop(logger, xmon, "registerer", {1_sec, 1, 1_mins, 0.1}), _shared(shared), _shrid(_shared.options.shrid), _location(_shared.options.location), _dontDoReplication(_shared.options.dontDoReplication), _shuckleHost(_shared.options.shuckleHost), _shucklePort(_shared.options.shucklePort) {} virtual ~ShardRegisterer() = default; void init() { _env.updateAlert(_alert, "Waiting to register ourselves for the first time"); } virtual bool periodicStep() { { LOG_INFO(_env, "Registering ourselves (shard %s, location %s, %s) with shuckle", _shrid, (int)_location, _shared.sock().addr()); const auto [err, errStr] = registerShard(_shuckleHost, _shucklePort, 10_sec, _shrid, _location, _shared.isLeader.load(std::memory_order_relaxed), _shared.sock().addr()); if (err == EINTR) { return false; } if (err) { _env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", errStr); return false; } } { std::array replicas; LOG_INFO(_env, "Fetching replicas for shardId %s from shuckle", _shrid.shardId()); const auto [err, errStr] = fetchShardReplicas(_shuckleHost, _shucklePort, 10_sec, _shrid, replicas); if (err == EINTR) { return false; } if (err) { _env.updateAlert(_alert, "Failed getting shard replicas from shuckle: %s", errStr); return false; } if (_shared.sock().addr() != replicas[_shrid.replicaId().u8]) { _env.updateAlert(_alert, "AddrsInfo in shuckle: %s , not matching local AddrsInfo: %s", replicas[_shrid.replicaId().u8], _shared.sock().addr()); return false; } if (unlikely(!_shared.replicas)) { size_t emptyReplicas{0}; for (auto& replica : replicas) { if (replica.addrs[0].port == 0) { ++emptyReplicas; } } if (emptyReplicas > 0 && !_dontDoReplication) { _env.updateAlert(_alert, "Didn't get enough replicas with known addresses from shuckle"); return false; } } if (unlikely(!_shared.replicas || *_shared.replicas != replicas)) { LOG_DEBUG(_env, "Updating replicas to %s %s %s %s %s", replicas[0], replicas[1], replicas[2], replicas[3], replicas[4]); std::atomic_exchange(&_shared.replicas, std::make_shared>(replicas)); } } _env.clearAlert(_alert); return true; } }; struct ShardBlockServiceUpdater : PeriodicLoop { private: ShardShared& _shared; ShardReplicaId _shrid; std::string _shuckleHost; uint16_t _shucklePort; XmonNCAlert _alert; std::vector _blockServices; std::vector _currentBlockServices; bool _updatedOnce; public: ShardBlockServiceUpdater(Logger& logger, std::shared_ptr& xmon, ShardShared& shared): PeriodicLoop(logger, xmon, "bs_updater", {1_sec, shared.isLeader.load(std::memory_order_relaxed) ? 2_mins : 1_mins}), _shared(shared), _shrid(_shared.options.shrid), _shuckleHost(_shared.options.shuckleHost), _shucklePort(_shared.options.shucklePort), _updatedOnce(false) { _env.updateAlert(_alert, "Waiting to fetch block services for the first time"); } virtual bool periodicStep() override { if (!_blockServices.empty()) { // We delayed applying cache update most likely we were leader. We should apply it now _shared.blockServicesCache.updateCache(_blockServices, _currentBlockServices); _blockServices.clear(); _currentBlockServices.clear(); } LOG_INFO(_env, "about to fetch block services from %s:%s", _shuckleHost, _shucklePort); const auto [err, errStr] = fetchBlockServices(_shuckleHost, _shucklePort, 10_sec, _shrid.shardId(), _blockServices, _currentBlockServices); if (err == EINTR) { return false; } if (err) { _env.updateAlert(_alert, "could not reach shuckle: %s", errStr); return false; } if (_blockServices.empty()) { _env.updateAlert(_alert, "got no block services"); return false; } // We immediately update cache if we are leader and delay until next iteration on leader unless this is first update which we apply immediately if (!_shared.isLeader.load(std::memory_order_relaxed) || !_updatedOnce) { _updatedOnce = true; _shared.blockServicesCache.updateCache(_blockServices, _currentBlockServices); _blockServices.clear(); _currentBlockServices.clear(); LOG_DEBUG(_env, "updated block services"); } _env.clearAlert(_alert); return true; } }; static void logsDBstatsToMetrics(struct MetricsBuilder& metricsBuilder, const LogsDBStats& stats, ShardReplicaId shrid, EggsTime now) { { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldU64( "idle_time", stats.idleTime.load(std::memory_order_relaxed).ns); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldU64( "processing_time", stats.processingTime.load(std::memory_order_relaxed).ns); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldU64( "leader_last_active", stats.leaderLastActive.load(std::memory_order_relaxed).ns); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "append_window", stats.appendWindow.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "entries_released", stats.entriesReleased.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "follower_lag", stats.followerLag.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "reader_lag", stats.readerLag.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "catchup_window", stats.catchupWindow.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "entries_read", stats.entriesRead.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "requests_received", stats.requestsReceived.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "responses_received", stats.requestsReceived.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "requests_sent", stats.requestsSent.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "responses_sent", stats.responsesSent.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldFloat( "requests_timedout", stats.requestsTimedOut.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } { metricsBuilder.measurement("eggsfs_shard_logsdb"); metricsBuilder.tag("shard", shrid); metricsBuilder.tag("leader", stats.isLeader.load(std::memory_order_relaxed)); metricsBuilder.fieldU64( "current_epoch", stats.currentEpoch.load(std::memory_order_relaxed)); metricsBuilder.timestamp(now); } } struct ShardMetricsInserter : PeriodicLoop { private: ShardShared& _shared; ShardReplicaId _shrid; XmonNCAlert _sendMetricsAlert; MetricsBuilder _metricsBuilder; std::unordered_map _rocksDBStats; std::array _sockQueueAlerts; XmonNCAlert _writeQueueAlert; public: ShardMetricsInserter(Logger& logger, std::shared_ptr& xmon, ShardShared& shared): PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}), _shared(shared), _shrid(_shared.options.shrid), _sendMetricsAlert(XmonAppType::DAYTIME, 5_mins), _sockQueueAlerts({XmonAppType::NEVER, XmonAppType::NEVER}), _writeQueueAlert(XmonAppType::NEVER) {} virtual ~ShardMetricsInserter() = default; virtual bool periodicStep() { _shared.sharedDB.dumpRocksDBStatistics(); for (int i = 0; i < 2; i++) { if (std::ceil(_shared.receivedRequests[i]) >= MAX_RECV_MSGS) { _env.updateAlert(_sockQueueAlerts[i], "recv queue for sock %s is full (%s)", i, _shared.receivedRequests[i]); } else { _env.clearAlert(_sockQueueAlerts[i]); } } if (std::ceil(_shared.logEntriesQueueSize) >= LOG_ENTRIES_QUEUE_SIZE) { _env.updateAlert(_writeQueueAlert, "write queue is full (%s)", _shared.logEntriesQueueSize); } else { _env.clearAlert(_writeQueueAlert); } auto now = eggsNow(); for (ShardMessageKind kind : allShardMessageKind) { const ErrorCount& errs = _shared.errors[(int)kind]; for (int i = 0; i < errs.count.size(); i++) { uint64_t count = errs.count[i].load(); if (count == 0) { continue; } _metricsBuilder.measurement("eggsfs_shard_requests"); _metricsBuilder.tag("shard", _shrid); _metricsBuilder.tag("kind", kind); _metricsBuilder.tag("write", !readOnlyShardReq(kind)); if (i == 0) { _metricsBuilder.tag("error", "NO_ERROR"); } else { _metricsBuilder.tag("error", (EggsError)i); } _metricsBuilder.fieldU64("count", count); _metricsBuilder.timestamp(now); } } { _metricsBuilder.measurement("eggsfs_shard_write_queue"); _metricsBuilder.tag("shard", _shrid); _metricsBuilder.fieldFloat("size", _shared.logEntriesQueueSize); _metricsBuilder.timestamp(now); } for (int i = 0; i < _shared.receivedRequests.size(); i++) { _metricsBuilder.measurement("eggsfs_shard_received_requests"); _metricsBuilder.tag("shard", _shrid); _metricsBuilder.tag("socket", i); _metricsBuilder.fieldFloat("count", _shared.receivedRequests[i]); _metricsBuilder.timestamp(now); } { _metricsBuilder.measurement("eggsfs_shard_pulled_write_requests"); _metricsBuilder.tag("shard", _shrid); _metricsBuilder.fieldFloat("count", _shared.pulledWriteRequests); _metricsBuilder.timestamp(now); } { _rocksDBStats.clear(); _shared.sharedDB.rocksDBMetrics(_rocksDBStats); for (const auto& [name, value]: _rocksDBStats) { _metricsBuilder.measurement("eggsfs_shard_rocksdb"); _metricsBuilder.tag("shard", _shrid); _metricsBuilder.fieldU64(name, value); _metricsBuilder.timestamp(now); } } logsDBstatsToMetrics(_metricsBuilder, _shared.logsDB.getStats(), _shrid, now); std::string err = sendMetrics(10_sec, _metricsBuilder.payload()); _metricsBuilder.reset(); if (err.empty()) { LOG_INFO(_env, "Sent metrics to influxdb"); _env.clearAlert(_sendMetricsAlert); return true; } else { _env.updateAlert(_sendMetricsAlert, "Could not insert metrics: %s", err); return false; } } }; void runShard(ShardOptions& options) { int logOutFd = STDOUT_FILENO; if (!options.logFile.empty()) { logOutFd = open(options.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644); if (logOutFd < 0) { throw SYSCALL_EXCEPTION("open"); } } Logger logger(options.logLevel, logOutFd, options.syslog, true); std::shared_ptr xmon; if (options.xmon) { xmon = std::make_shared(); } Env env(logger, xmon, "startup"); { LOG_INFO(env, "Running shard %s at location %s, with options:", options.shrid, (int)options.location); LOG_INFO(env, " level = %s", options.logLevel); LOG_INFO(env, " logFile = '%s'", options.logFile); LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost); LOG_INFO(env, " shucklePort = %s", options.shucklePort); LOG_INFO(env, " ownAddres = %s", options.shardAddrs); LOG_INFO(env, " simulateOutgoingPacketDrop = %s", options.simulateOutgoingPacketDrop); LOG_INFO(env, " syslog = %s", (int)options.syslog); LOG_INFO(env, "Using LogsDB with options:"); LOG_INFO(env, " dontDoReplication = '%s'", (int)options.dontDoReplication); LOG_INFO(env, " forceLeader = '%s'", (int)options.forceLeader); LOG_INFO(env, " forcedLastReleased = '%s'", options.forcedLastReleased); } // Immediately start xmon: we want the database initializing update to // be there. std::vector> threads; if (xmon) { XmonConfig config; { std::ostringstream ss; ss << "eggsshard" << options.appNameSuffix << "_" << std::setfill('0') << std::setw(3) << options.shrid.shardId() << "_" << options.shrid.replicaId(); config.appInstance = ss.str(); } config.prod = options.xmonProd; config.appType = XmonAppType::CRITICAL; threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, config))); } // then everything else XmonNCAlert dbInitAlert; env.updateAlert(dbInitAlert, "initializing database"); SharedRocksDB sharedDB(logger, xmon, options.dbDir + "/db", options.dbDir + "/db-statistics.txt"); sharedDB.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors()); sharedDB.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors()); sharedDB.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors()); rocksdb::Options rocksDBOptions; rocksDBOptions.create_if_missing = true; rocksDBOptions.create_missing_column_families = true; rocksDBOptions.compression = rocksdb::kLZ4Compression; rocksDBOptions.bottommost_compression = rocksdb::kZSTD; // 1000*256 = 256k open files at once, given that we currently run on a // single machine this is appropriate. rocksDBOptions.max_open_files = 1000; // We batch writes and flush manually. rocksDBOptions.manual_wal_flush = true; sharedDB.open(rocksDBOptions); BlockServicesCacheDB blockServicesCache(logger, xmon, sharedDB); ShardDB shardDB(logger, xmon, options.shrid.shardId(), options.transientDeadlineInterval, sharedDB, blockServicesCache); LogsDB logsDB(logger, xmon, sharedDB, options.shrid.replicaId(), shardDB.lastAppliedLogEntry(), options.dontDoReplication, options.forceLeader, !options.forceLeader, options.forcedLastReleased); env.clearAlert(dbInitAlert); if (options.dontDoReplication) { options.forcedLastReleased.u64 = shardDB.lastAppliedLogEntry(); } ShardShared shared(options, sharedDB, blockServicesCache, shardDB, logsDB, UDPSocketPair(env, options.shardAddrs)); threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shared))); threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shared))); threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shared))); threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shared))); if (options.metrics) { threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shared))); } // from this point on termination on SIGINT/SIGTERM will be graceful LoopThread::waitUntilStopped(threads); threads.clear(); shardDB.close(); sharedDB.close(); LOG_INFO(env, "shard terminating gracefully, bye."); }