From 27faaa45ae3eefa93681ff0a5e971454b92fa0ba Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Fri, 15 Mar 2024 16:49:39 +0000 Subject: [PATCH] ci: add ability to run with LogsDB, shard: add handling of LogsDB messages --- ci.py | 7 +- cpp/core/Bincode.hpp | 3 + cpp/core/CDCKey.hpp | 4 +- cpp/core/LogsDB.cpp | 5 +- cpp/core/Msgs.hpp | 1 - cpp/core/SPSC.hpp | 4 + cpp/shard/Shard.cpp | 708 +++++++++++++++++++++++----- cpp/shard/ShardDB.cpp | 2 + cpp/shard/ShardDB.hpp | 1 + cpp/shard/ShardKey.hpp | 7 + go/eggsrun/eggsrun.go | 48 +- go/eggstests/eggstests.go | 49 +- go/managedprocess/managedprocess.go | 16 +- integration.py | 8 +- 14 files changed, 710 insertions(+), 153 deletions(-) create mode 100644 cpp/shard/ShardKey.hpp diff --git a/ci.py b/ci.py index c4c1a41d..736cca22 100755 --- a/ci.py +++ b/ci.py @@ -8,6 +8,7 @@ parser = argparse.ArgumentParser() parser.add_argument('--functional', action='store_true') parser.add_argument('--integration', action='store_true') parser.add_argument('--short', action='store_true') +parser.add_argument('--logsdb', action='store_true') parser.add_argument('--kmod', action='store_true') parser.add_argument('--build', action='store_true') parser.add_argument('--docker', action='store_true', help='Build and run in docker image') @@ -58,11 +59,11 @@ if args.integration: # the `--pids-limit -1` is not something I hit but it seems # like a good idea. run_cmd_unbuffered( - ['docker', 'run', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--cap-add', 'SYS_ADMIN', '-v', '/dev/fuse:/dev/fuse', '--privileged', '--rm', '-i', '--mount', f'type=bind,src={script_dir},dst=/eggsfs', '-e', f'UID={os.getuid()}', '-e', f'GID={os.getgid()}', container, '/eggsfs/integration.py', '--docker'] + (['--short'] if args.short else []) + ['docker', 'run', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--cap-add', 'SYS_ADMIN', '-v', '/dev/fuse:/dev/fuse', '--privileged', '--rm', '-i', '--mount', f'type=bind,src={script_dir},dst=/eggsfs', '-e', f'UID={os.getuid()}', '-e', f'GID={os.getgid()}', container, '/eggsfs/integration.py', '--docker'] + (['--short'] if args.short else []) + (['--logsdb'] if args.logsdb else []) ) else: run_cmd_unbuffered( - ['./integration.py'] + (['--short'] if args.short else []) + ['./integration.py'] + (['--short'] if args.short else []) + (['--logsdb'] if args.logsdb else []) ) if args.prepare_image: @@ -71,4 +72,4 @@ if args.prepare_image: if args.kmod: bold_print('kmod tests') - wait_cmd(run_cmd(['./kmod/ci.sh'] + (['-short'] if args.short else []))) + wait_cmd(run_cmd(['./kmod/ci.sh'] + (['-short'] if args.short else []) + (['--logsdb'] if args.logsdb else []))) diff --git a/cpp/core/Bincode.hpp b/cpp/core/Bincode.hpp index 08e9ae4c..199be5eb 100644 --- a/cpp/core/Bincode.hpp +++ b/cpp/core/Bincode.hpp @@ -360,6 +360,9 @@ struct BincodeBuf { template void unpackList(BincodeList& xs) { xs.els.resize(unpackScalar()); + if (unlikely(xs.els.size() == 0)) { + return; + } // If it's a number of some sorts, just memcpy it if constexpr (std::is_integral_v || std::is_enum_v) { static_assert(std::endian::native == std::endian::little); diff --git a/cpp/core/CDCKey.hpp b/cpp/core/CDCKey.hpp index b951919f..d61ef652 100644 --- a/cpp/core/CDCKey.hpp +++ b/cpp/core/CDCKey.hpp @@ -1,7 +1,7 @@ #pragma once #include +#include -#include "Common.hpp" -static const std::array CDCKey{0xa1,0x11,0x1c,0xf0,0xf6,0x2b,0xba,0x02,0x25,0xd2,0x66,0xe7,0xa6,0x94,0x86,0xfe}; \ No newline at end of file +static const std::array CDCKey{0xa1,0x11,0x1c,0xf0,0xf6,0x2b,0xba,0x02,0x25,0xd2,0x66,0xe7,0xa6,0x94,0x86,0xfe}; diff --git a/cpp/core/LogsDB.cpp b/cpp/core/LogsDB.cpp index 2e1c1e65..92bb71d9 100644 --- a/cpp/core/LogsDB.cpp +++ b/cpp/core/LogsDB.cpp @@ -125,7 +125,7 @@ public: LogsDBLogEntry entry() const { auto value = _smaller->value(); - return LogsDBLogEntry{key(), {value.data(), value.data() + value.size()}}; + return LogsDBLogEntry{key(), {(const uint8_t*)value.data(), (const uint8_t*)value.data() + value.size()}}; } void dropEntry() { @@ -1576,7 +1576,7 @@ public: // Mismatch in responses could be due to network issues we don't want to crash but we will ignore and retry // Mismatch in internal state is asserted on. - if (unlikely(request->replicaId != resp.header.requestId)) { + if (unlikely(request->replicaId != resp.replicaId)) { LOG_ERROR(_env, "Expected response from replica %s, got it from replica %s. Response: %s", request->replicaId, resp.header.requestId, resp); continue; } @@ -1625,6 +1625,7 @@ public: break; case LogMessageKind::LOG_READ: _catchupReader.proccessLogReadRequest(req.replicaId, req.header.requestId, req.requestContainer.getLogRead()); + break; case LogMessageKind::NEW_LEADER: _leaderElection.proccessNewLeaderRequest(req.replicaId, req.header.requestId, req.requestContainer.getNewLeader()); break; diff --git a/cpp/core/Msgs.hpp b/cpp/core/Msgs.hpp index 34d9342b..638ee4db 100644 --- a/cpp/core/Msgs.hpp +++ b/cpp/core/Msgs.hpp @@ -327,7 +327,6 @@ std::ostream& operator<<(std::ostream& out, BlockServiceId crc); struct LogIdx { uint64_t u64; - constexpr LogIdx(): u64(0) {} constexpr LogIdx(uint64_t idx): u64(idx) { diff --git a/cpp/core/SPSC.hpp b/cpp/core/SPSC.hpp index 1e6965ec..ba47f25f 100644 --- a/cpp/core/SPSC.hpp +++ b/cpp/core/SPSC.hpp @@ -52,6 +52,10 @@ public: } } + bool isClosed() { + return _size.load(std::memory_order_relaxed) & (1ull<<31); + } + // Tries to push all the elements. Returns how many were actually // pushed. First element in `els` gets pushed first. Returns 0 // if the queue is closed. diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index efa52bf8..e32b762c 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -1,13 +1,15 @@ #include +#include #include -#include #include #include +#include #include #include #include #include #include +#include #include #include "Assert.hpp" @@ -20,8 +22,10 @@ #include "Msgs.hpp" #include "Shard.hpp" #include "Env.hpp" +#include "MsgsGen.hpp" #include "MultiplexedChannel.hpp" #include "ShardDB.hpp" +#include "ShardKey.hpp" #include "CDCKey.hpp" #include "SharedRocksDB.hpp" #include "Shuckle.hpp" @@ -53,6 +57,115 @@ const int LOG_ENTRIES_QUEUE_SIZE = 8192; // a few megabytes, should be quite a b const int MAX_WRITES_AT_ONCE = 200; // say that each write is ~100bytes, this gives us 20KB of write const int MAX_RECV_MSGS = 100; +enum class WriterQueueEntryKind :uint8_t { + LOGSDB_REQUEST = 1, + LOGSDB_RESPONSE = 2, + SHARD_LOG_ENTRY = 3, +}; + +std::ostream& operator<<(std::ostream& out, WriterQueueEntryKind kind) { + switch (kind) { + case WriterQueueEntryKind::LOGSDB_REQUEST: + out << "LOGSDB_REQUEST"; + break; + case WriterQueueEntryKind::LOGSDB_RESPONSE: + out << "LOGSDB_RESPONSE"; + break; + case WriterQueueEntryKind::SHARD_LOG_ENTRY: + out << "SHARD_LOG_ENTRY"; + break; + default: + out << "Unknown WriterQueueEntryKind(" << (uint8_t)kind << ")"; + break; + } + return out; +} +class ShardWriterRequest { +public: + ShardWriterRequest() { clear(); } + ShardWriterRequest(const ShardWriterRequest& other) { + _kind = other._kind; + _data = other._data; + } + + ShardWriterRequest(ShardWriterRequest&& other) { + *this = std::move(other); + } + + ShardWriterRequest& operator=(ShardWriterRequest&& other) { + _kind = other._kind; + _data = std::move(other._data); + other.clear(); + return *this; + } + + void clear() { _kind = (WriterQueueEntryKind)0; } + + WriterQueueEntryKind kind() const { return _kind; } + + LogsDBRequest& setLogsDBRequest() { + _kind = WriterQueueEntryKind::LOGSDB_REQUEST; + auto& x = _data.emplace<0>(); + return x; + } + + const LogsDBRequest& getLogsDBRequest() const { + ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST); + return std::get<0>(_data); + } + + LogsDBRequest&& moveLogsDBRequest() { + ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST); + clear(); + return std::move(std::get<0>(_data)); + } + + LogsDBResponse& setLogsDBResponse() { + _kind = WriterQueueEntryKind::LOGSDB_RESPONSE; + auto& x = _data.emplace<1>(); + return x; + } + + const LogsDBResponse& getLogsDBResponse() const { + ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE); + return std::get<1>(_data); + } + + LogsDBResponse&& moveLogsDBResponse() { + ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE); + clear(); + return std::move(std::get<1>(_data)); + } + + QueuedShardLogEntry& setQueuedShardLogEntry() { + _kind = WriterQueueEntryKind::SHARD_LOG_ENTRY; + auto& x = _data.emplace<2>(); + return x; + } + + const QueuedShardLogEntry& getQueuedShardLogEntry() const { + ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY); + return std::get<2>(_data); + } + + QueuedShardLogEntry&& moveQueuedShardLogEntry() { + ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY); + clear(); + return std::move(std::get<2>(_data)); + } + +private: + WriterQueueEntryKind _kind; + std::variant _data; +}; + +struct ReplicaAddrsInfo { + std::array addrs; +}; + +bool operator==(sockaddr_in& l, sockaddr_in& r) { + return l.sin_addr.s_addr == r.sin_addr.s_addr && l.sin_port == r.sin_port; +} struct ShardShared { SharedRocksDB& sharedDB; @@ -63,15 +176,15 @@ struct ShardShared { std::array socks; std::array timings; std::array errors; - SPSC logEntriesQueue; + SPSC writerRequestsQueue; std::atomic logEntriesQueueSize; std::array, 2> receivedRequests; // how many requests we got at once from each socket std::atomic pulledWriteRequests; // how many requests we got from write queue - std::array replicas; - std::mutex replicasLock; + std::shared_ptr> replicas; + std::atomic isLeader; ShardShared() = delete; - ShardShared(SharedRocksDB& sharedDB_, BlockServicesCacheDB& blockServicesCache_, ShardDB& shardDB_): sharedDB(sharedDB_), blockServicesCache(blockServicesCache_), shardDB(shardDB_), ips{0, 0}, ports{0, 0}, logEntriesQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) { + ShardShared(SharedRocksDB& sharedDB_, BlockServicesCacheDB& blockServicesCache_, ShardDB& shardDB_): sharedDB(sharedDB_), blockServicesCache(blockServicesCache_), shardDB(shardDB_), ips{0, 0}, ports{0, 0}, writerRequestsQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) { for (ShardMessageKind kind : allShardMessageKind) { timings[(int)kind] = Timings::Standard(); } @@ -101,7 +214,7 @@ static bool bigResponse(ShardMessageKind kind) { ); } -static void packResponse( +static void packShardResponse( Env& env, ShardShared& shared, std::vector& sendBuf, @@ -111,7 +224,7 @@ static void packResponse( ShardMessageKind kind, Duration elapsed, bool dropArtificially, - struct sockaddr_in* clientAddr, + const struct sockaddr_in* clientAddr, int sockIx, EggsError err, const ShardRespContainer& resp @@ -156,7 +269,7 @@ static void packResponse( // -- we'll fix up the actual values later. auto& hdr = sendHdrs.emplace_back(); hdr.msg_hdr = { - .msg_name = clientAddr, + .msg_name = (sockaddr_in*)clientAddr, .msg_namelen = sizeof(*clientAddr), .msg_iovlen = 1, }; @@ -178,6 +291,7 @@ private: // run data AES128Key _expandedCDCKey; + AES128Key _expandedShardKey; // recvmmsg data (one per socket, since we receive from both and send from both // using some of the header data) std::array, 2> _recvBuf; @@ -188,7 +302,7 @@ private: ShardReqContainer _reqContainer; ShardRespContainer _respContainer; // log entries buffers - std::vector _logEntries; + std::vector _logEntries; // sendmmsg data std::vector _sendBuf; std::array, 2> _sendHdrs; // one per socket @@ -215,6 +329,7 @@ public: }; convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability); convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); + expandKey(ShardKey, _expandedShardKey); _init(); } @@ -281,7 +396,136 @@ private: } } - void _handleRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& reqBbuf) { + void _handleLogsDBResponse(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& buf) { + LOG_DEBUG(_env, "received LogsDBResponse from %s", *clientAddr); + + auto replicaId = _getReplicaId(clientAddr); + + if (replicaId == LogsDB::REPLICA_COUNT) { + LOG_DEBUG(_env, "We can't match this address to replica. Dropping"); + return; + } + + auto& resp = _logEntries.emplace_back().setLogsDBResponse(); + resp.replicaId = replicaId; + + try { + resp.header.unpack(buf); + } catch (const BincodeException& err) { + LOG_ERROR(_env, "Could not parse LogsDBResponse.header: %s", err.what()); + _logEntries.pop_back(); + return; + } + + if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) { + LOG_DEBUG(_env, "artificially dropping LogsDBResponse %s", resp.header.requestId); + _logEntries.pop_back(); + return; + } + + try { + resp.responseContainer.unpack(buf, resp.header.kind); + } catch (const BincodeException& exc) { + LOG_ERROR(_env, "Could not parse LogsDBResponse.responseContainer of kind %s: %s", resp.header.kind, exc.what()); + _logEntries.pop_back(); + return; + } + + if (unlikely(buf.remaining() < 8)) { + LOG_ERROR(_env, "Could not parse LogsDBResponse of kind %s from %s. Message signature is 8 bytes, only %s remaining", resp.header.kind, *clientAddr, buf.remaining()); + _logEntries.pop_back(); + return; + } + + auto expectedMac = cbcmac(_expandedShardKey, buf.data, buf.cursor - buf.data); + BincodeFixedBytes<8> receivedMac; + buf.unpackFixedBytes<8>(receivedMac); + if (unlikely(expectedMac != receivedMac.data)) { + LOG_ERROR(_env, "Incorrect signature for LogsDBResponse %s from %s", resp.header.kind, *clientAddr); + _logEntries.pop_back(); + return; + } + + if (unlikely(buf.remaining())) { + LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBResponse %s from %s", buf.remaining(), resp.header.kind, *clientAddr); + _logEntries.pop_back(); + } + LOG_DEBUG(_env, "Received response %s for requests id %s from replica id %s", resp.header.kind, resp.header.requestId, resp.replicaId); + } + + void _handleLogsDBRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& buf) { + LOG_DEBUG(_env, "received LogsDBRequest from %s", *clientAddr); + + auto replicaId = _getReplicaId(clientAddr); + + if (replicaId == LogsDB::REPLICA_COUNT) { + LOG_DEBUG(_env, "We can't match this address to replica. Dropping"); + return; + } + + auto& req = _logEntries.emplace_back().setLogsDBRequest(); + req.replicaId = replicaId; + + try { + req.header.unpack(buf); + } catch (const BincodeException& err) { + LOG_ERROR(_env, "Could not parse LogsDBRequest.header: %s", err.what()); + _logEntries.pop_back(); + return; + } + + if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) { + LOG_DEBUG(_env, "artificially dropping LogsDBRequest %s", req.header.requestId); + _logEntries.pop_back(); + return; + } + + try { + req.requestContainer.unpack(buf, req.header.kind); + } catch (const BincodeException& exc) { + LOG_ERROR(_env, "Could not parse LogsDBRequest.requestContainer of kind %s: %s", req.header.kind, exc.what()); + _logEntries.pop_back(); + return; + } + + if (unlikely(buf.remaining() < 8)) { + LOG_ERROR(_env, "Could not parse LogsDBRequest of kind %s from %s, message signature is 8 bytes, only %s remaining", req.header.kind, *clientAddr, buf.remaining()); + _logEntries.pop_back(); + return; + } + + auto expectedMac = cbcmac(_expandedShardKey, buf.data, buf.cursor - buf.data); + BincodeFixedBytes<8> receivedMac; + buf.unpackFixedBytes<8>(receivedMac); + if (unlikely(expectedMac != receivedMac.data)) { + LOG_ERROR(_env, "Incorrect signature for LogsDBRequest %s from %s", req.header.kind, *clientAddr); + _logEntries.pop_back(); + return; + } + + if (unlikely(buf.remaining())) { + LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBRequest %s from %s", buf.remaining(), req.header.kind, *clientAddr); + _logEntries.pop_back(); + } + LOG_DEBUG(_env, "Received request %s with requests id %s from replica id %s", req.header.kind, req.header.requestId, req.replicaId); + } + + uint8_t _getReplicaId(sockaddr_in* clientAddress) { + auto replicasPtr = _shared.replicas; + if (!replicasPtr) { + return LogsDB::REPLICA_COUNT; + } + + for (ReplicaId replicaId = 0; replicaId.u8 < replicasPtr->size(); ++replicaId.u8) { + auto& replica = replicasPtr->at(replicaId.u8); + if (replica.addrs[0] == *clientAddress || replica.addrs[1] == *clientAddress) { + return replicaId.u8; + } + } + return LogsDB::REPLICA_COUNT; + } + + void _handleShardRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& reqBbuf) { LOG_DEBUG(_env, "received message from %s", *clientAddr); // First, try to parse the header @@ -325,13 +569,21 @@ private: err = EggsError::MALFORMED_REQUEST; } - // authenticate, if necessary - if (isPrivilegedRequestKind(reqHeader.kind)) { - auto expectedMac = cbcmac(_expandedCDCKey, reqBbuf.data, reqBbuf.cursor - reqBbuf.data); - BincodeFixedBytes<8> receivedMac; - reqBbuf.unpackFixedBytes<8>(receivedMac); - if (expectedMac != receivedMac.data) { - err = EggsError::NOT_AUTHORISED; + if (likely(err == NO_ERROR)) { + // authenticate, if necessary + if (isPrivilegedRequestKind(reqHeader.kind)) { + if (unlikely(reqBbuf.remaining() < 8)) { + LOG_ERROR(_env, "Could not parse request of kind %s from %s, message signature is 8 bytes, only %s remaining", reqHeader.kind, *clientAddr, reqBbuf.remaining()); + RAISE_ALERT(_env, "could not parse request of kind %s from %s, will reply with error.", reqHeader.kind, *clientAddr); + err = EggsError::MALFORMED_REQUEST; + } else { + auto expectedMac = cbcmac(_expandedCDCKey, reqBbuf.data, reqBbuf.cursor - reqBbuf.data); + BincodeFixedBytes<8> receivedMac; + reqBbuf.unpackFixedBytes<8>(receivedMac); + if (expectedMac != receivedMac.data) { + err = EggsError::NOT_AUTHORISED; + } + } } } @@ -348,7 +600,7 @@ private: if (readOnlyShardReq(_reqContainer.kind())) { err = _shared.shardDB.read(_reqContainer, _respContainer); } else { - auto& entry = _logEntries.emplace_back(); + auto& entry = _logEntries.emplace_back().setQueuedShardLogEntry(); entry.sockIx = sockIx; entry.clientAddr = *clientAddr; entry.receivedAt = t0; @@ -365,8 +617,7 @@ private: Duration elapsed = eggsNow() - t0; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; - packResponse(_env, _shared, _sendBuf, _sendHdrs[sockIx], _sendVecs[sockIx], reqHeader.requestId, reqHeader.kind, elapsed, dropArtificially, clientAddr, sockIx, err, _respContainer); - + packShardResponse(_env, _shared, _sendBuf, _sendHdrs[sockIx], _sendVecs[sockIx], reqHeader.requestId, reqHeader.kind, elapsed, dropArtificially, clientAddr, sockIx, err, _respContainer); } public: @@ -407,8 +658,16 @@ public: } } + for (auto& msg : _channel.getProtocolMessages(LOG_RESP_PROTOCOL_VERSION)) { + _handleLogsDBResponse(msg.socketId, msg.clientAddr, msg.buf); + } + + for (auto& msg : _channel.getProtocolMessages(LOG_REQ_PROTOCOL_VERSION)) { + _handleLogsDBRequest(msg.socketId, msg.clientAddr, msg.buf); + } + for (auto& msg : _channel.getProtocolMessages(SHARD_REQ_PROTOCOL_VERSION)) { - _handleRequest(msg.socketId, msg.clientAddr, msg.buf); + _handleShardRequest(msg.socketId, msg.clientAddr, msg.buf); } @@ -417,8 +676,8 @@ public: size_t numLogEntries = _logEntries.size(); if (numLogEntries > 0) { LOG_DEBUG(_env, "pushing %s log entries to writer", numLogEntries); - uint32_t pushed = _shared.logEntriesQueue.push(_logEntries); - _shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.logEntriesQueue.size()*0.05; + uint32_t pushed = _shared.writerRequestsQueue.push(_logEntries); + _shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.writerRequestsQueue.size()*0.05; if (pushed < numLogEntries) { LOG_INFO(_env, "tried to push %s elements to write queue, but pushed %s instead", numLogEntries, pushed); } @@ -452,15 +711,22 @@ public: struct ShardWriter : Loop { private: ShardShared& _shared; + AES128Key _expandedShardKey; uint64_t _currentLogIndex; ShardRespContainer _respContainer; + std::vector _requests; + std::vector _logEntries; - std::vector _logsdbRequests; - std::vector _logsdbResponses; - std::vector outRequests; - std::vector outResponses; std::unique_ptr _logsDB; + const bool _dontWaitForReplication; + std::vector _logsDBRequests; + std::vector _logsDBResponses; + std::vector _logsDBOutRequests; + std::vector _logsDBOutResponses; + std::unordered_map _inFlightEntries; + std::vector _outgoingLogEntries; + std::shared_ptr> _replicaInfo; // sendmmsg data (one per socket) std::vector _sendBuf; @@ -472,17 +738,19 @@ private: uint64_t _outgoingPacketDropProbability; // probability * 10,000 virtual void sendStop() override { - _shared.logEntriesQueue.close(); + _shared.writerRequestsQueue.close(); } public: ShardWriter(Logger& logger, std::shared_ptr& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) : Loop(logger, xmon, "writer"), _shared(shared), + _dontWaitForReplication(options.dontWaitForReplication), _packetDropRand(eggsNow().ns), _incomingPacketDropProbability(0), _outgoingPacketDropProbability(0) { + expandKey(ShardKey, _expandedShardKey); _currentLogIndex = _shared.shardDB.lastAppliedLogEntry(); auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) { if (prob != 0.0) { @@ -493,109 +761,324 @@ public: }; convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability); convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); - _logEntries.reserve(MAX_WRITES_AT_ONCE); + _requests.reserve(MAX_WRITES_AT_ONCE); if (options.writeToLogsDB) { _logsDB.reset(new LogsDB(_env,_shared.sharedDB,shrid.replicaId(), _currentLogIndex, options.dontWaitForReplication, options.dontDoReplication, options.forceLeader, options.avoidBeingLeader, options.initialStart, options.initialStart ? _currentLogIndex : 0)); - _logsDB->processIncomingMessages(_logsdbRequests, _logsdbResponses); + _logsDB->processIncomingMessages(_logsDBRequests, _logsDBResponses); + _shared.isLeader.store(_logsDB->isLeader(), std::memory_order_relaxed); + } else { + ALWAYS_ASSERT(shrid.replicaId() == 0); + _shared.isLeader.store(true, std::memory_order_relaxed); } } virtual ~ShardWriter() = default; - virtual void step() override { - _logEntries.clear(); - _sendBuf.clear(); - for (int i = 0; i < _sendHdrs.size(); i++) { - _sendHdrs[i].clear(); - _sendVecs[i].clear(); - } - uint32_t pulled = _shared.logEntriesQueue.pull(_logEntries, MAX_WRITES_AT_ONCE); - if (likely(pulled > 0)) { - LOG_DEBUG(_env, "pulled %s requests from write queue", pulled); - _shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05; - } else { - // queue is closed, stop - stop(); - } + void logsDBStep() { + LOG_DEBUG(_env, "Calling LogsDB processIncomingMessages with %s requests and %s responses", _logsDBRequests.size(), _logsDBResponses.size()); + _logsDB->processIncomingMessages(_logsDBRequests,_logsDBResponses); + _shared.isLeader.store(_logsDB->isLeader(), std::memory_order_relaxed); - std::vector entries; - if (_logsDB) { + // If we are leader write any outstanding entries + std::vector logsDBEntries; + if (!_logsDB->isLeader()) { + //TODO: we could notify clients we are no longer leader + if (_logEntries.size() > 0) { + LOG_INFO(_env, "Received %s shard write requests but we are not log leader. dropping", _logEntries.size()); + _logEntries.clear(); + } + if (_inFlightEntries.size() > 0) { + LOG_INFO(_env, "There are %s in flight shard write requests but we are no longer log leader. dropping", _inFlightEntries.size()); + _inFlightEntries.clear(); + } + } else if (_logEntries.size()) { + logsDBEntries.reserve(_logEntries.size()); std::vector data; data.resize(MAX_UDP_MTU); - entries.reserve(_logEntries.size()); - for (auto& logEntry : _logEntries) { - entries.emplace_back(); - auto& entry = entries.back(); + logsDBEntries.reserve(_requests.size()); + LogIdx expectedIdx = _currentLogIndex + _inFlightEntries.size(); + for (auto& queuedLogEntry : _logEntries) { + auto& logsDBEntry = logsDBEntries.emplace_back();; + queuedLogEntry.logEntry.idx = ++expectedIdx; BincodeBuf buf((char*)&data[0], MAX_UDP_MTU); - logEntry.logEntry.pack(buf); - entry.value.assign(buf.data, buf.cursor); + queuedLogEntry.logEntry.pack(buf); + logsDBEntry.value.assign(buf.data, buf.cursor); } - auto err = _logsDB->appendEntries(entries); + auto err = _logsDB->appendEntries(logsDBEntries); ALWAYS_ASSERT(err == NO_ERROR); - _logsDB->processIncomingMessages(_logsdbRequests, _logsdbResponses); - _logsDB->getOutgoingMessages(outRequests, outResponses); - entries.clear(); + ALWAYS_ASSERT(_logEntries.size() == logsDBEntries.size()); + for (size_t i = 0; i < _logEntries.size(); ++i) { + if (logsDBEntries[i].idx == 0) { + // if we don't wait for replication the window gets cleared immediately + ALWAYS_ASSERT(!_dontWaitForReplication); + ALWAYS_ASSERT(_inFlightEntries.size() == LogsDB::IN_FLIGHT_APPEND_WINDOW); + LOG_INFO(_env, "Appended %s out of %s shard write requests. Log in flight windows is full. Dropping other entries", i, _logEntries.size()); + break; + } + ALWAYS_ASSERT(logsDBEntries[i].idx == _logEntries[i].logEntry.idx); + _inFlightEntries.emplace(_logEntries[i].logEntry.idx.u64, std::move(_logEntries[i])); + } + logsDBEntries.clear(); - _logsDB->readEntries(entries); - - ALWAYS_ASSERT(entries.size() == _logEntries.size()); + if (_dontWaitForReplication) { + // usually the state machine is moved by responses if we don't expect any we move it manually + _logsDBRequests.clear(); + _logsDBResponses.clear(); + _logsDB->processIncomingMessages(_logsDBRequests, _logsDBResponses); + } } - size_t entriesIdx = 0; - for (auto& logEntry : _logEntries) { + + // Log if not active is not chaty but it's messages are higher priority as they make us progress state under high load. + // We want to have priority when sending out + _logsDB->getOutgoingMessages(_logsDBOutRequests, _logsDBOutResponses); + + for (auto& response : _logsDBOutResponses) { + packLogsDBResponse(response); + } + + for (auto request : _logsDBOutRequests) { + packLogsDBRequest(*request); + } + + _logsDB->readEntries(logsDBEntries); + _outgoingLogEntries.reserve(logsDBEntries.size()); + if (_dontWaitForReplication && _logsDB->isLeader()) { + ALWAYS_ASSERT(_inFlightEntries.size() == logsDBEntries.size()); + } + + for (auto& logsDBEntry : logsDBEntries) { + ++_currentLogIndex; + ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx); + ALWAYS_ASSERT(logsDBEntry.value.size() > 0); + BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size()); + ShardLogEntry shardEntry; + shardEntry.unpack(buf); + ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx); + auto it = _inFlightEntries.find(shardEntry.idx.u64); + + if (_dontWaitForReplication && _logsDB->isLeader()) { + ALWAYS_ASSERT(it != _inFlightEntries.end()); + ALWAYS_ASSERT(shardEntry == it->second.logEntry); + } + + auto err = _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _respContainer); + if (it != _inFlightEntries.end()) { + auto& logEntry = _outgoingLogEntries.emplace_back(std::move(it->second)); + _inFlightEntries.erase(it); + + ALWAYS_ASSERT(shardEntry == logEntry.logEntry); + if (likely(logEntry.requestId)) { + LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr); + } else { + LOG_DEBUG(_env, "applying request-less log entry"); + } + if (likely(logEntry.requestId)) { + Duration elapsed = eggsNow() - logEntry.receivedAt; + bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; + packShardResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer); + } else if (unlikely(err != NO_ERROR)) { + RAISE_ALERT(_env, "could not apply request-less log entry: %s", err); + } + } + } + _shared.shardDB.flush(true); + _logsDB->flush(true); + + sendMessages(); + } + + void noLogsDbStep() { + for (auto& logEntry : _logEntries) { if (likely(logEntry.requestId)) { LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr); } else { LOG_DEBUG(_env, "applying request-less log entry"); } - _currentLogIndex++; - EggsError err = NO_ERROR; - if (_logsDB) { - auto& logsdbEntry = entries[entriesIdx++]; - ALWAYS_ASSERT(_currentLogIndex == logsdbEntry.idx); - ALWAYS_ASSERT(logsdbEntry.value.size() > 0); - BincodeBuf buf((char*)&logsdbEntry.value.front(), logsdbEntry.value.size()); - ShardLogEntry shardEntry; - shardEntry.unpack(buf); - ALWAYS_ASSERT(shardEntry == logEntry.logEntry); - err = _shared.shardDB.applyLogEntry(logsdbEntry.idx.u64, shardEntry, _respContainer); - } else { - err = _shared.shardDB.applyLogEntry(_currentLogIndex, logEntry.logEntry, _respContainer); - } + + auto err = _shared.shardDB.applyLogEntry(++_currentLogIndex, logEntry.logEntry, _respContainer); if (likely(logEntry.requestId)) { Duration elapsed = eggsNow() - logEntry.receivedAt; bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; - packResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer); + packShardResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer); } else if (unlikely(err != NO_ERROR)) { RAISE_ALERT(_env, "could not apply request-less log entry: %s", err); } } - if (pulled > 0) { - LOG_DEBUG(_env, "flushing and sending %s writes", pulled); + if (_requests.size() > 0) { + LOG_DEBUG(_env, "flushing and sending %s writes", _requests.size()); _shared.shardDB.flush(true); // important to send all of them after the flush! otherwise it's not durable yet - for (int i = 0; i < _sendHdrs.size(); i++) { - if (_sendHdrs[i].size() == 0) { continue; } - LOG_DEBUG(_env, "sending %s write responses to socket %s", _sendHdrs[i].size(), i); - for (int j = 0; j < _sendHdrs[i].size(); j++) { - auto& vec = _sendVecs[i][j]; - vec.iov_base = &_sendBuf[(size_t)vec.iov_base]; - auto& hdr = _sendHdrs[i][j]; - hdr.msg_hdr.msg_iov = &vec; - } - int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0); - if (unlikely(ret < 0)) { - // we get this when nf drops packets - if (errno != EPERM) { - throw SYSCALL_EXCEPTION("sendto"); - } else { - LOG_INFO(_env, "dropping %s responses because of EPERM", _sendHdrs[i].size()); - } - } else if (unlikely(ret < _sendHdrs[i].size())) { - LOG_INFO(_env, "dropping %s out of %s requests since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size()); - } + sendMessages(); + } + } + + ReplicaAddrsInfo* addressFromReplicaId(ReplicaId id) { + if (!_replicaInfo) { + return nullptr; + } + + auto& replicaInfo = (*_replicaInfo)[id.u8]; + if (replicaInfo.addrs[0].sin_addr.s_addr == 0) { + return nullptr; + } + return &replicaInfo; + } + + void packLogsDBResponse(LogsDBResponse& response) { + auto addrInfoPtr = addressFromReplicaId(response.replicaId); + if (unlikely(addrInfoPtr == nullptr)) { + LOG_DEBUG(_env, "No information for replica id %s. dropping response", response.replicaId); + return; + } + auto& addrInfo = *addrInfoPtr; + + bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; + if (unlikely(dropArtificially)) { + LOG_DEBUG(_env, "artificially dropping response %s", response.header.requestId); + return; + } + // pack into sendBuf + size_t sendBufBegin = _sendBuf.size(); + _sendBuf.resize(sendBufBegin + MAX_UDP_MTU); + BincodeBuf respBbuf(&_sendBuf[sendBufBegin], MAX_UDP_MTU); + response.header.pack(respBbuf); + response.responseContainer.pack(respBbuf); + respBbuf.packFixedBytes<8>({cbcmac(_expandedShardKey, respBbuf.data, respBbuf.cursor - respBbuf.data)}); + _sendBuf.resize(sendBufBegin + respBbuf.len()); + + auto now = eggsNow(); // randomly pick one of the shard addrs and one of our sockets + int whichReplicaAddr = now.ns & !!addrInfo.addrs[1].sin_port; + int whichSock = (now.ns>>1) & !!_shared.ips[1]; + + LOG_DEBUG(_env, "will send response for req id %s kind %s to %s", response.header.requestId, response.header.kind, addrInfo.addrs[whichReplicaAddr]); + + // Prepare sendmmsg stuff. The vectors might be resized by the + // time we get to sending this, so store references when we must + // -- we'll fix up the actual values later. + auto& hdr = _sendHdrs[whichSock].emplace_back(); + hdr.msg_hdr = { + .msg_name = (sockaddr_in*)&addrInfo.addrs[whichReplicaAddr], + .msg_namelen = sizeof(addrInfo.addrs[whichReplicaAddr]), + .msg_iovlen = 1, + }; + hdr.msg_len = respBbuf.len(); + auto& vec = _sendVecs[whichSock].emplace_back(); + vec.iov_base = (void*)sendBufBegin; + vec.iov_len = respBbuf.len(); + } + + void packLogsDBRequest(LogsDBRequest& request) { + auto addrInfoPtr = addressFromReplicaId(request.replicaId); + if (unlikely(addrInfoPtr == nullptr)) { + LOG_DEBUG(_env, "No information for replica id %s. dropping request", request.replicaId); + return; + } + auto& addrInfo = *addrInfoPtr; + + bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; + if (unlikely(dropArtificially)) { + LOG_DEBUG(_env, "artificially dropping request %s", request.header.requestId); + return; + } + // pack into sendBuf + size_t sendBufBegin = _sendBuf.size(); + _sendBuf.resize(sendBufBegin + MAX_UDP_MTU); + BincodeBuf respBbuf(&_sendBuf[sendBufBegin], MAX_UDP_MTU); + request.header.pack(respBbuf); + request.requestContainer.pack(respBbuf); + respBbuf.packFixedBytes<8>({cbcmac(_expandedShardKey, respBbuf.data, respBbuf.cursor - respBbuf.data)}); + _sendBuf.resize(sendBufBegin + respBbuf.len()); + + request.sentTime = eggsNow(); // randomly pick one of the shard addrs and one of our sockets + int whichReplicaAddr = request.sentTime.ns & !!addrInfo.addrs[1].sin_port; + int whichSock = (request.sentTime.ns>>1) & !!_shared.ips[1]; + + LOG_DEBUG(_env, "will send request for req id %s kind %s to %s", request.header.requestId, request.header.kind, addrInfo.addrs[whichReplicaAddr]); + + // Prepare sendmmsg stuff. The vectors might be resized by the + // time we get to sending this, so store references when we must + // -- we'll fix up the actual values later. + auto& hdr = _sendHdrs[whichSock].emplace_back(); + hdr.msg_hdr = { + .msg_name = (sockaddr_in*)&addrInfo.addrs[whichReplicaAddr], + .msg_namelen = sizeof(addrInfo.addrs[whichReplicaAddr]), + .msg_iovlen = 1, + }; + hdr.msg_len = respBbuf.len(); + auto& vec = _sendVecs[whichSock].emplace_back(); + vec.iov_base = (void*)sendBufBegin; + vec.iov_len = respBbuf.len(); + } + + void sendMessages() { + for (int i = 0; i < _sendHdrs.size(); i++) { + if (_sendHdrs[i].size() == 0) { continue; } + LOG_DEBUG(_env, "sending %s messages to socket %s", _sendHdrs[i].size(), i); + for (int j = 0; j < _sendHdrs[i].size(); j++) { + auto& vec = _sendVecs[i][j]; + vec.iov_base = &_sendBuf[(size_t)vec.iov_base]; + auto& hdr = _sendHdrs[i][j]; + hdr.msg_hdr.msg_iov = &vec; } + int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0); + if (unlikely(ret < 0)) { + // we get this when nf drops packets + if (errno != EPERM) { + throw SYSCALL_EXCEPTION("sendto"); + } else { + LOG_INFO(_env, "dropping %s messages because of EPERM", _sendHdrs[i].size()); + } + } else if (unlikely(ret < _sendHdrs[i].size())) { + LOG_INFO(_env, "dropping %s out of %s messages since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size()); + } + } + } + + virtual void step() override { + _requests.clear(); + _logsDBRequests.clear(); + _logsDBResponses.clear(); + _logsDBOutRequests.clear(); + _logsDBOutResponses.clear(); + _logEntries.clear(); + _outgoingLogEntries.clear(); + _sendBuf.clear(); + for (int i = 0; i < _sendHdrs.size(); i++) { + _sendHdrs[i].clear(); + _sendVecs[i].clear(); + } + _replicaInfo = _shared.replicas; + uint32_t pulled = _shared.writerRequestsQueue.pull(_requests, MAX_WRITES_AT_ONCE, _logsDB ? _logsDB->getNextTimeout() : -1); + if (likely(pulled > 0)) { + LOG_DEBUG(_env, "pulled %s requests from write queue", pulled); + _shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05; + } + if (unlikely(_shared.writerRequestsQueue.isClosed())) { + // queue is closed, stop + stop(); + return; + } + + for(auto& request : _requests) { + switch (request.kind()) { + case WriterQueueEntryKind::LOGSDB_REQUEST: + _logsDBRequests.emplace_back(request.moveLogsDBRequest()); + break; + case WriterQueueEntryKind::LOGSDB_RESPONSE: + _logsDBResponses.emplace_back(request.moveLogsDBResponse()); + break; + case WriterQueueEntryKind::SHARD_LOG_ENTRY: + _logEntries.emplace_back(request.moveQueuedShardLogEntry()); + break; + } + } + + if (_logsDB) { + logsDBStep(); + } else { + noLogsDbStep(); } } }; @@ -612,7 +1095,7 @@ private: AddrsInfo _info; bool _infoLoaded; bool _registerCompleted; - uint8_t _leaderReplicaId; + std::array _replicas; public: ShardRegisterer(Logger& logger, std::shared_ptr& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) : PeriodicLoop(logger, xmon, "registerer", {1_sec, 1, 1_mins, 0.1}), @@ -622,8 +1105,7 @@ public: _shucklePort(options.shucklePort), _hasSecondIp(options.ipPorts[1].port != 0), _infoLoaded(false), - _registerCompleted(false), - _leaderReplicaId(options.leaderReplicaId) + _registerCompleted(false) {} virtual ~ShardRegisterer() = default; @@ -668,14 +1150,28 @@ public: _env.updateAlert(_alert, "AddrsInfo in shuckle: %s , not matching local AddrsInfo: %s", replicas[_shrid.replicaId().u8], _info); return false; } - { - std::lock_guard guard(_shared.replicasLock); - _shared.replicas = replicas; + if (_replicas != replicas) { + _replicas = replicas; + auto replicaUpdatePtr = std::make_shared>(); + auto& replicaUpdate = *replicaUpdatePtr; + for (size_t i = 0; i < _replicas.size(); ++i) { + uint32_t ip; + memcpy(&ip, _replicas[i].ip1.data.data(), _replicas[i].ip1.data.size()); + replicaUpdate[i].addrs[0].sin_family = AF_INET; + replicaUpdate[i].addrs[0].sin_addr.s_addr = ip; + memcpy(&ip, _replicas[i].ip2.data.data(), _replicas[i].ip2.data.size()); + replicaUpdate[i].addrs[1].sin_family = AF_INET; + replicaUpdate[i].addrs[1].sin_addr.s_addr = ip; + + replicaUpdate[i].addrs[0].sin_port = htons(_replicas[i].port1); + replicaUpdate[i].addrs[1].sin_port= htons(_replicas[i].port2); + } + std::atomic_exchange(&_shared.replicas, replicaUpdatePtr); } } LOG_INFO(_env, "Registering ourselves (shard %s, %s) with shuckle", _shrid, _info); - err = registerShardReplica(_shuckleHost, _shucklePort, 10_sec, _shrid, _shrid.replicaId() == _leaderReplicaId, _info); + err = registerShardReplica(_shuckleHost, _shucklePort, 10_sec, _shrid, _shared.isLeader.load(std::memory_order_relaxed), _info); if (!err.empty()) { _env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", err); return false; diff --git a/cpp/shard/ShardDB.cpp b/cpp/shard/ShardDB.cpp index 2184a33f..85210151 100644 --- a/cpp/shard/ShardDB.cpp +++ b/cpp/shard/ShardDB.cpp @@ -123,6 +123,7 @@ static int pickMtu(uint16_t mtu) { void ShardLogEntry::pack(BincodeBuf& buf) const { buf.packScalar(SHARD_LOG_PROTOCOL_VERSION); + idx.pack(buf); time.pack(buf); buf.packScalar((uint16_t)body.kind()); body.pack(buf); @@ -131,6 +132,7 @@ void ShardLogEntry::pack(BincodeBuf& buf) const { void ShardLogEntry::unpack(BincodeBuf& buf) { uint32_t protocol = buf.unpackScalar(); ALWAYS_ASSERT(protocol == SHARD_LOG_PROTOCOL_VERSION); + idx.unpack(buf); time.unpack(buf); ShardLogEntryKind kind = (ShardLogEntryKind)buf.unpackScalar(); body.unpack(buf, kind); diff --git a/cpp/shard/ShardDB.hpp b/cpp/shard/ShardDB.hpp index f989fb5a..7a639e6b 100644 --- a/cpp/shard/ShardDB.hpp +++ b/cpp/shard/ShardDB.hpp @@ -12,6 +12,7 @@ struct ShardLogEntry { + LogIdx idx; EggsTime time; ShardLogEntryContainer body; diff --git a/cpp/shard/ShardKey.hpp b/cpp/shard/ShardKey.hpp new file mode 100644 index 00000000..a446b57c --- /dev/null +++ b/cpp/shard/ShardKey.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include +#include + + +static const std::array ShardKey{0x0d, 0x57, 0x82, 0xed, 0x77, 0x90, 0xd6, 0x69, 0x0f, 0x7d, 0x22, 0x44, 0x19, 0x5a, 0xff, 0x0b}; diff --git a/go/eggsrun/eggsrun.go b/go/eggsrun/eggsrun.go index bb150d96..cada31a6 100644 --- a/go/eggsrun/eggsrun.go +++ b/go/eggsrun/eggsrun.go @@ -39,6 +39,7 @@ func main() { xmon := flag.String("xmon", "", "") shuckleScriptsJs := flag.String("shuckle-scripts-js", "", "") noFuse := flag.Bool("no-fuse", false, "") + useLogsDB := flag.Bool("use-logsdb", false, "Spin up replicas for shard. 0 is leader, rest followers") flag.Parse() noRunawayArgs() @@ -189,25 +190,40 @@ func main() { procs.StartCDC(log, *repoDir, &opts) } + replicaCount := uint8(1) + if *useLogsDB { + replicaCount = 5 + } // Start shards for i := 0; i < 256; i++ { - shid := msgs.ShardId(i) - opts := managedprocess.ShardOpts{ - Exe: cppExes.ShardExe, - Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)), - LogLevel: level, - Shid: shid, - Valgrind: *buildType == "valgrind", - ShuckleAddress: shuckleAddress, - Perf: *profile, - Xmon: *xmon, + for r := uint8(0); r < replicaCount; r++ { + shrid := msgs.MakeShardReplicaId(msgs.ShardId(i), msgs.ReplicaId(r)) + opts := managedprocess.ShardOpts{ + Exe: cppExes.ShardExe, + Shrid: shrid, + Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)), + LogLevel: level, + Valgrind: *buildType == "valgrind", + ShuckleAddress: shuckleAddress, + Perf: *profile, + Xmon: *xmon, + UseLogsDB: "", + } + if *useLogsDB { + opts.Dir = path.Join(*dataDir, fmt.Sprintf("shard_%03d_%d", i, r)) + if r == 0 { + opts.UseLogsDB = "LEADER" + } else { + opts.UseLogsDB = "FOLLOWER" + } + } + if *startingPort != 0 { + opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*startingPort)+1+uint16(i)+uint16(r)*256) + } else { + opts.Addr1 = "127.0.0.1:0" + } + procs.StartShard(log, *repoDir, &opts) } - if *startingPort != 0 { - opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*startingPort)+1+uint16(i)) - } else { - opts.Addr1 = "127.0.0.1:0" - } - procs.StartShard(log, *repoDir, &opts) } fmt.Printf("waiting for shards/cdc for %v...\n", waitShuckleFor) diff --git a/go/eggstests/eggstests.go b/go/eggstests/eggstests.go index 82959328..afe81301 100644 --- a/go/eggstests/eggstests.go +++ b/go/eggstests/eggstests.go @@ -853,6 +853,7 @@ func main() { blockServiceKiller := flag.Bool("block-service-killer", false, "Go around killing block services to stimulate paths recovering from that.") race := flag.Bool("race", false, "Go race detector") shuckleBeaconPort := flag.Uint("shuckle-beacon-port", 0, "") + useLogsDB := flag.Bool("use-logsdb", false, "Spin up replicas for shard. 0 is leader, rest followers") flag.Var(&overrides, "cfg", "Config overrides") flag.Parse() noRunawayArgs() @@ -1106,9 +1107,11 @@ func main() { } // Start CDC + cdcOpts := &managedprocess.CDCOpts{ Exe: cppExes.CDCExe, Dir: path.Join(*dataDir, "cdc"), + ReplicaId: msgs.ReplicaId(0), LogLevel: level, Valgrind: *buildType == "valgrind", Perf: *profile, @@ -1122,25 +1125,41 @@ func main() { } procs.StartCDC(log, *repoDir, cdcOpts) + replicaCount := uint8(1) + if *useLogsDB { + replicaCount = 5 + } + // Start shards numShards := 256 for i := 0; i < numShards; i++ { - shid := msgs.ShardId(i) - shopts := managedprocess.ShardOpts{ - Exe: cppExes.ShardExe, - Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)), - LogLevel: level, - Shid: shid, - Valgrind: *buildType == "valgrind", - Perf: *profile, - IncomingPacketDrop: *incomingPacketDrop, - OutgoingPacketDrop: *outgoingPacketDrop, - ShuckleAddress: shuckleAddress, - Addr1: "127.0.0.1:0", - Addr2: "127.0.0.1:0", - TransientDeadlineInterval: &testTransientDeadlineInterval, + for r := uint8(0); r < replicaCount; r++ { + shrid := msgs.MakeShardReplicaId(msgs.ShardId(i), msgs.ReplicaId(r)) + shopts := managedprocess.ShardOpts{ + Exe: cppExes.ShardExe, + Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)), + LogLevel: level, + Shrid: shrid, + Valgrind: *buildType == "valgrind", + Perf: *profile, + IncomingPacketDrop: *incomingPacketDrop, + OutgoingPacketDrop: *outgoingPacketDrop, + ShuckleAddress: shuckleAddress, + Addr1: "127.0.0.1:0", + Addr2: "127.0.0.1:0", + TransientDeadlineInterval: &testTransientDeadlineInterval, + UseLogsDB: "", + } + if *useLogsDB { + shopts.Dir = path.Join(*dataDir, fmt.Sprintf("shard_%03d_%d", i, r)) + if r == 0 { + shopts.UseLogsDB = "LEADER" + } else { + shopts.UseLogsDB = "FOLLOWER" + } + } + procs.StartShard(log, *repoDir, &shopts) } - procs.StartShard(log, *repoDir, &shopts) } // now wait for shards/cdc diff --git a/go/managedprocess/managedprocess.go b/go/managedprocess/managedprocess.go index ace510cf..50a62380 100644 --- a/go/managedprocess/managedprocess.go +++ b/go/managedprocess/managedprocess.go @@ -461,8 +461,8 @@ func BuildGoExes(ll *lib.Logger, repoDir string, race bool) *GoExes { type ShardOpts struct { Exe string Dir string + Shrid msgs.ShardReplicaId LogLevel lib.LogLevel - Shid msgs.ShardId Valgrind bool Perf bool IncomingPacketDrop float64 @@ -472,6 +472,7 @@ type ShardOpts struct { Addr2 string TransientDeadlineInterval *time.Duration Xmon string + UseLogsDB string } func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts *ShardOpts) { @@ -495,6 +496,9 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts * if opts.Xmon != "" { args = append(args, "-xmon", opts.Xmon) } + if opts.UseLogsDB != "" { + args = append(args, "-use-logsdb", opts.UseLogsDB) + } switch opts.LogLevel { case lib.TRACE: args = append(args, "-log-level", "trace") @@ -507,11 +511,12 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts * } args = append(args, opts.Dir, - fmt.Sprintf("%d", int(opts.Shid)), + fmt.Sprintf("%d", int(opts.Shrid.Shard())), + fmt.Sprintf("%d", int(opts.Shrid.Replica())), ) cppDir := cppDir(repoDir) mpArgs := ManagedProcessArgs{ - Name: fmt.Sprintf("shard %v", opts.Shid), + Name: fmt.Sprintf("shard %v", opts.Shrid), Exe: opts.Exe, Args: args, StdoutFile: path.Join(opts.Dir, "stdout"), @@ -553,6 +558,7 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts * type CDCOpts struct { Exe string Dir string + ReplicaId msgs.ReplicaId LogLevel lib.LogLevel Valgrind bool Perf bool @@ -592,10 +598,10 @@ func (procs *ManagedProcesses) StartCDC(ll *lib.Logger, repoDir string, opts *CD case lib.ERROR: args = append(args, "-log-level", "error") } - args = append(args, opts.Dir) + args = append(args, opts.Dir, fmt.Sprintf("%d", int(opts.ReplicaId))) cppDir := cppDir(repoDir) mpArgs := ManagedProcessArgs{ - Name: "cdc", + Name: fmt.Sprintf("cdc %v", opts.ReplicaId), Exe: opts.Exe, Args: args, StdoutFile: path.Join(opts.Dir, "stdout"), diff --git a/integration.py b/integration.py index aedbb912..81acd0e8 100755 --- a/integration.py +++ b/integration.py @@ -9,6 +9,7 @@ from common import * parser = argparse.ArgumentParser() parser.add_argument('--short', action='store_true') parser.add_argument('--docker', action='store_true') +parser.add_argument('--logsdb', action='store_true') args = parser.parse_args() script_dir = os.path.dirname(os.path.realpath(__file__)) @@ -39,16 +40,17 @@ else: bold_print('integration tests') short = ['-short'] if args.short else [] +logsdb = ['-use-logsdb'] if args.logsdb else [] # -block-service-killer does not work with FUSE driver (the duplicated FDs # of the child processes confuse the FUSE driver). tests = [ - ['./go/eggstests/eggstests', '-binaries-dir', build_sanitized, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-filter', fuse_tests, '-outgoing-packet-drop', '0.02'] + short, - ['./go/eggstests/eggstests', '-binaries-dir', build_release, '-preserve-data-dir', '-verbose', '-block-service-killer', '-filter', 'direct', '-repo-dir', '.', '-tmp-dir', '.'] + short, + ['./go/eggstests/eggstests', '-binaries-dir', build_sanitized, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-filter', fuse_tests, '-outgoing-packet-drop', '0.02'] + short + logsdb, + ['./go/eggstests/eggstests', '-binaries-dir', build_release, '-preserve-data-dir', '-verbose', '-block-service-killer', '-filter', 'direct', '-repo-dir', '.', '-tmp-dir', '.'] + short + logsdb, ] if not args.short: # valgrind is super slow, it still surfaced bugs in the past but run the short # versions only in the long tests. - tests.append(['./go/eggstests/eggstests', '-binaries-dir', build_valgrind, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-short', '-filter', fuse_tests]) + tests.append(['./go/eggstests/eggstests', '-binaries-dir', build_valgrind, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-short', '-filter', fuse_tests] + logsdb) # we need three free ports, we get them here upfront rather than in shuckle to reduce # the chance of races -- if we got it from the integration tests it'll be while # tons of things are started in another integration test