diff --git a/cpp/core/Bincode.hpp b/cpp/core/Bincode.hpp index aab50daa..ebd38392 100644 --- a/cpp/core/Bincode.hpp +++ b/cpp/core/Bincode.hpp @@ -464,6 +464,11 @@ std::ostream& operator<<(std::ostream& out, const ProtocolMessage +std::ostream& operator<<(std::ostream& out, const SignedProtocolMessage& msg) { + return out << msg.id << " : " << msg.body; +} + constexpr size_t DEFAULT_UDP_MTU = 1472; // 1500 - IP header - ICMP header constexpr size_t MAX_UDP_MTU = 8972; // 9000 - IP header - ICMP header diff --git a/cpp/core/Msgs.hpp b/cpp/core/Msgs.hpp index 8ed8dbb8..6887fc71 100644 --- a/cpp/core/Msgs.hpp +++ b/cpp/core/Msgs.hpp @@ -407,6 +407,7 @@ std::ostream& operator<<(std::ostream& out, BlockServiceId crc); // we reserve 3 bits so that we can fit ReplicaId in LeaderToken struct LogIdx { + static constexpr size_t STATIC_SIZE = sizeof(uint64_t); uint64_t u64; constexpr LogIdx(): u64(0) {} @@ -444,6 +445,10 @@ struct LogIdx { u64 = buf.unpackScalar(); } + constexpr size_t packedSize() const { + return STATIC_SIZE; + } + constexpr bool valid() const { return u64 < 0x2000000000000000ull; } diff --git a/cpp/core/Protocol.hpp b/cpp/core/Protocol.hpp index 564178c7..180f7d5f 100644 --- a/cpp/core/Protocol.hpp +++ b/cpp/core/Protocol.hpp @@ -14,6 +14,14 @@ constexpr uint32_t SHARD_RESP_PROTOCOL_VERSION = 0x1414853; // '2414853' constexpr uint32_t SHARD_LOG_PROTOCOL_VERSION = 0x2414853; +// >>> format(struct.unpack('>> format(struct.unpack('>> format(struct.unpack('; using SignedShardReqMsg = SignedProtocolMessage; using ShardRespMsg = ProtocolMessage; @@ -45,3 +84,5 @@ using CDCReqMsg = ProtocolMessage; using CDCRespMsg = ProtocolMessage; using LogReqMsg = SignedProtocolMessage; using LogRespMsg = SignedProtocolMessage; +using ShardCheckPointedReqMsg = SignedProtocolMessage; +using ShardCheckPointedRespMsg = SignedProtocolMessage; diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 9a95da21..164c2fbe 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -19,6 +19,7 @@ #include "LogsDB.hpp" #include "Loop.hpp" #include "Metrics.hpp" +#include "MsgsGen.hpp" #include "MultiplexedChannel.hpp" #include "PeriodicLoop.hpp" #include "Protocol.hpp" @@ -43,6 +44,7 @@ struct QueuedShardLogEntry { IpPort clientAddr; int sockIx; // which sock to use to reply ShardMessageKind requestKind; + bool checkPointed; }; struct SnapshotRequest { @@ -277,6 +279,49 @@ static void packShardResponse( LOG_DEBUG(env, "will send response for req id %s kind %s to %s", msg.id, reqKind, clientAddr); } +static void packCheckPointedShardResponse( + Env& env, + ShardShared& shared, + const AddrsInfo& srcAddr, + UDPSender& sender, + Duration elapsed, + bool dropArtificially, + const IpPort& clientAddr, + uint8_t sockIx, + ShardMessageKind reqKind, + const ShardCheckPointedRespMsg& msg, + const AES128Key& key +) { + auto respKind = msg.body.resp.kind(); + shared.timings[(int)reqKind].add(elapsed); + shared.errors[(int)reqKind].add( respKind != ShardMessageKind::ERROR ? EggsError::NO_ERROR : msg.body.resp.getError()); + if (unlikely(dropArtificially)) { + LOG_DEBUG(env, "artificially dropping response %s", msg.id); + return; + } + ALWAYS_ASSERT(clientAddr.port != 0); + + if (respKind != ShardMessageKind::ERROR) { + LOG_DEBUG(env, "successfully processed request %s with kind %s in %s", msg.id, reqKind, elapsed); + if (bigResponse(respKind)) { + if (unlikely(env._shouldLog(LogLevel::LOG_TRACE))) { + LOG_TRACE(env, "resp: %s", msg); + } else { + LOG_DEBUG(env, "resp: "); + } + } else { + LOG_DEBUG(env, "resp: %s", msg); + } + } else { + LOG_DEBUG(env, "request %s failed with error %s in %s", reqKind, msg.body.resp.getError(), elapsed); + } + sender.prepareOutgoingMessage(env, srcAddr, sockIx, clientAddr, + [&msg, &key](BincodeBuf& buf) { + msg.pack(buf, key); + }); + LOG_DEBUG(env, "will send response for req id %s kind %s to %s", msg.id, reqKind, clientAddr); +} + // a hack while parts of messages in shard protocol are signed and part are unsigned static ShardMessageKind peekKind(const BincodeBuf& buf) { auto tmpBuf = buf; @@ -305,7 +350,7 @@ private: std::unique_ptr> _receiver; std::unique_ptr _sender; - std::unique_ptr{SHARD_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>> _channel; + std::unique_ptr{SHARD_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>> _channel; public: ShardServer(Logger& logger, std::shared_ptr& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) : Loop(logger, xmon, "server"), @@ -326,7 +371,7 @@ public: expandKey(CDCKey, _expandedCDCKey); _receiver = std::make_unique>(UDPReceiverConfig{.perSockMaxRecvMsg = MAX_RECV_MSGS, .maxMsgSize = MAX_UDP_MTU}); _sender = std::make_unique(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}); - _channel = std::make_unique{SHARD_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>>(); + _channel = std::make_unique{SHARD_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>>(); } virtual ~ShardServer() = default; @@ -393,17 +438,21 @@ private: return LogsDB::REPLICA_COUNT; } - void _handleShardRequest(UDPMessage& msg) { + void _handleShardRequest(UDPMessage& msg, bool checkPointed) { LOG_DEBUG(_env, "received message from %s", msg.clientAddr); if (unlikely(!_shared.isLeader.load(std::memory_order_relaxed))) { LOG_DEBUG(_env, "not leader, dropping request %s", msg.clientAddr); return; } - // First, try to parse the header ShardReqMsg req; try { - if (isPrivilegedRequestKind((uint8_t)peekKind(msg.buf))) { + if (checkPointed) { + ShardCheckPointedReqMsg signedReq; + signedReq.unpack(msg.buf, _expandedCDCKey); + req.id = signedReq.id; + req.body = std::move(signedReq.body); + } else if (isPrivilegedRequestKind((uint8_t)peekKind(msg.buf))) { SignedShardReqMsg signedReq; signedReq.unpack(msg.buf, _expandedCDCKey); req.id = signedReq.id; @@ -435,9 +484,10 @@ private: // if it's a write request we prepare the log entry and // send it off. ShardRespMsg respContainer; - respContainer.id = req.id; + ShardCheckPointedRespMsg checkpointedRespContainer; + checkpointedRespContainer.id = respContainer.id = req.id; if (readOnlyShardReq(req.body.kind())) { - _shared.shardDB.read(req.body, respContainer.body); + checkpointedRespContainer.body.checkPointIdx = _shared.shardDB.read(req.body, checkPointed ? checkpointedRespContainer.body.resp : respContainer.body); } else if (unlikely(req.body.kind() == ShardMessageKind::SHARD_SNAPSHOT)) { auto& entry = _logEntries.emplace_back().setSnapshotRequest(); entry.sockIx = msg.socketIx; @@ -453,18 +503,25 @@ private: entry.receivedAt = t0; entry.requestKind = req.body.kind(); entry.requestId = req.id; + entry.checkPointed = checkPointed; auto err = _shared.shardDB.prepareLogEntry(req.body, entry.logEntry); if (likely(err == EggsError::NO_ERROR)) { return; // we're done here, move along } else { - respContainer.body.setError() = err; + (checkPointed ? checkpointedRespContainer.body.resp : respContainer.body).setError() = err; _logEntries.pop_back(); // back out the log entry } } Duration elapsed = eggsNow() - t0; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; - packShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), respContainer); + if (checkPointed) { + packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), checkpointedRespContainer, _expandedCDCKey); + + } else { + packShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), respContainer); + } + } public: @@ -489,7 +546,11 @@ public: } std::array shardMsgCount{0, 0}; for (auto& msg : _channel->protocolMessages(SHARD_REQ_PROTOCOL_VERSION)) { - _handleShardRequest(msg); + _handleShardRequest(msg, false); + ++shardMsgCount[msg.socketIx]; + } + for (auto& msg : _channel->protocolMessages(SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION)) { + _handleShardRequest(msg, true); ++shardMsgCount[msg.socketIx]; } @@ -520,8 +581,10 @@ private: const std::string _basePath; ShardShared& _shared; AES128Key _expandedShardKey; + AES128Key _expandedCDCKey; uint64_t _currentLogIndex; ShardRespMsg _respContainer; + ShardCheckPointedRespMsg _checkPointedrespContainer; std::vector _requests; const size_t _maxWritesAtOnce; @@ -560,6 +623,7 @@ public: _outgoingPacketDropProbability(0) { expandKey(ShardKey, _expandedShardKey); + expandKey(CDCKey, _expandedCDCKey); _currentLogIndex = _shared.shardDB.lastAppliedLogEntry(); auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) { if (prob != 0.0) { @@ -667,8 +731,9 @@ public: ALWAYS_ASSERT(it != _inFlightEntries.end()); ALWAYS_ASSERT(shardEntry == it->second.logEntry); } - - _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _respContainer.body); + _checkPointedrespContainer.body.checkPointIdx = logsDBEntry.idx; + auto& resp = (it != _inFlightEntries.end() && it->second.checkPointed) ? _checkPointedrespContainer.body.resp : _respContainer.body; + _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp); if (it != _inFlightEntries.end()) { auto& logEntry = _outgoingLogEntries.emplace_back(std::move(it->second)); _inFlightEntries.erase(it); @@ -680,12 +745,17 @@ public: LOG_DEBUG(_env, "applying request-less log entry"); } if (likely(logEntry.requestId)) { - _respContainer.id = logEntry.requestId; + _checkPointedrespContainer.id = _respContainer.id = logEntry.requestId; Duration elapsed = eggsNow() - logEntry.receivedAt; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; - packShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _respContainer); - } else if (unlikely(_respContainer.body.kind() == ShardMessageKind::ERROR)) { - RAISE_ALERT(_env, "could not apply request-less log entry: %s", _respContainer.body.getError()); + if (logEntry.checkPointed) { + packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _checkPointedrespContainer, _expandedCDCKey); + } else { + packShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, logEntry.clientAddr, logEntry.sockIx, logEntry.requestKind, _respContainer); + } + + } else if (unlikely(resp.kind() == ShardMessageKind::ERROR)) { + RAISE_ALERT(_env, "could not apply request-less log entry: %s", resp.getError()); } } } diff --git a/cpp/shard/ShardDB.cpp b/cpp/shard/ShardDB.cpp index 3aa36b6d..3927bd7b 100644 --- a/cpp/shard/ShardDB.cpp +++ b/cpp/shard/ShardDB.cpp @@ -863,7 +863,7 @@ struct ShardDBImpl { return _visitInodes(options, _filesCf, req, resp); } - void read(const ShardReqContainer& req, ShardRespContainer& resp) { + uint64_t read(const ShardReqContainer& req, ShardRespContainer& resp) { LOG_DEBUG(_env, "processing read-only request of kind %s", req.kind()); auto err = EggsError::NO_ERROR; @@ -916,6 +916,7 @@ struct ShardDBImpl { } else { ALWAYS_ASSERT(req.kind() == resp.kind()); } + return _lastAppliedLogEntry(options); } @@ -1703,7 +1704,7 @@ struct ShardDBImpl { // log application void _advanceLastAppliedLogEntry(rocksdb::WriteBatch& batch, uint64_t index) { - uint64_t oldIndex = _lastAppliedLogEntry(); + uint64_t oldIndex = _lastAppliedLogEntry({}); ALWAYS_ASSERT(oldIndex+1 == index, "old index is %s, expected %s, got %s", oldIndex, oldIndex+1, index); LOG_DEBUG(_env, "bumping log index from %s to %s", oldIndex, index); StaticValue v; @@ -3590,9 +3591,9 @@ struct ShardDBImpl { return cbcmac(_expandedSecretKey, (const uint8_t*)&id, sizeof(id)); } - uint64_t _lastAppliedLogEntry() { + uint64_t _lastAppliedLogEntry(const rocksdb::ReadOptions& options) { std::string value; - ROCKS_DB_CHECKED(_db->Get({}, shardMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value)); + ROCKS_DB_CHECKED(_db->Get(options, shardMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value)); ExternalValue v(value); return v().u64(); } @@ -3826,8 +3827,8 @@ ShardDB::~ShardDB() { _impl = nullptr; } -void ShardDB::read(const ShardReqContainer& req, ShardRespContainer& resp) { - ((ShardDBImpl*)_impl)->read(req, resp); +uint64_t ShardDB::read(const ShardReqContainer& req, ShardRespContainer& resp) { + return ((ShardDBImpl*)_impl)->read(req, resp); } EggsError ShardDB::prepareLogEntry(const ShardReqContainer& req, ShardLogEntry& logEntry) { @@ -3839,7 +3840,7 @@ void ShardDB::applyLogEntry(uint64_t logEntryIx, const ShardLogEntry& logEntry, } uint64_t ShardDB::lastAppliedLogEntry() { - return ((ShardDBImpl*)_impl)->_lastAppliedLogEntry(); + return ((ShardDBImpl*)_impl)->_lastAppliedLogEntry({}); } const std::array& ShardDB::secretKey() const { diff --git a/cpp/shard/ShardDB.hpp b/cpp/shard/ShardDB.hpp index 9779521f..7dbb3131 100644 --- a/cpp/shard/ShardDB.hpp +++ b/cpp/shard/ShardDB.hpp @@ -51,7 +51,8 @@ public: void close(); // Performs a read-only request, responding immediately. - void read(const ShardReqContainer& req, ShardRespContainer& resp); + // Returns last applied log entry at the point of reading + uint64_t read(const ShardReqContainer& req, ShardRespContainer& resp); // Prepares and persists a log entry to be applied. //