ci: add ability to run with LogsDB, shard: add handling of LogsDB messages

This commit is contained in:
Miroslav Crnic
2024-03-15 16:49:39 +00:00
committed by GitHub Enterprise
parent 74e81ca836
commit 27faaa45ae
14 changed files with 710 additions and 153 deletions

7
ci.py
View File

@@ -8,6 +8,7 @@ parser = argparse.ArgumentParser()
parser.add_argument('--functional', action='store_true')
parser.add_argument('--integration', action='store_true')
parser.add_argument('--short', action='store_true')
parser.add_argument('--logsdb', action='store_true')
parser.add_argument('--kmod', action='store_true')
parser.add_argument('--build', action='store_true')
parser.add_argument('--docker', action='store_true', help='Build and run in docker image')
@@ -58,11 +59,11 @@ if args.integration:
# the `--pids-limit -1` is not something I hit but it seems
# like a good idea.
run_cmd_unbuffered(
['docker', 'run', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--cap-add', 'SYS_ADMIN', '-v', '/dev/fuse:/dev/fuse', '--privileged', '--rm', '-i', '--mount', f'type=bind,src={script_dir},dst=/eggsfs', '-e', f'UID={os.getuid()}', '-e', f'GID={os.getgid()}', container, '/eggsfs/integration.py', '--docker'] + (['--short'] if args.short else [])
['docker', 'run', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--cap-add', 'SYS_ADMIN', '-v', '/dev/fuse:/dev/fuse', '--privileged', '--rm', '-i', '--mount', f'type=bind,src={script_dir},dst=/eggsfs', '-e', f'UID={os.getuid()}', '-e', f'GID={os.getgid()}', container, '/eggsfs/integration.py', '--docker'] + (['--short'] if args.short else []) + (['--logsdb'] if args.logsdb else [])
)
else:
run_cmd_unbuffered(
['./integration.py'] + (['--short'] if args.short else [])
['./integration.py'] + (['--short'] if args.short else []) + (['--logsdb'] if args.logsdb else [])
)
if args.prepare_image:
@@ -71,4 +72,4 @@ if args.prepare_image:
if args.kmod:
bold_print('kmod tests')
wait_cmd(run_cmd(['./kmod/ci.sh'] + (['-short'] if args.short else [])))
wait_cmd(run_cmd(['./kmod/ci.sh'] + (['-short'] if args.short else []) + (['--logsdb'] if args.logsdb else [])))

View File

@@ -360,6 +360,9 @@ struct BincodeBuf {
template<typename A>
void unpackList(BincodeList<A>& xs) {
xs.els.resize(unpackScalar<uint16_t>());
if (unlikely(xs.els.size() == 0)) {
return;
}
// If it's a number of some sorts, just memcpy it
if constexpr (std::is_integral_v<A> || std::is_enum_v<A>) {
static_assert(std::endian::native == std::endian::little);

View File

@@ -1,7 +1,7 @@
#pragma once
#include <array>
#include <cstdint>
#include "Common.hpp"
static const std::array<uint8_t, 16> CDCKey{0xa1,0x11,0x1c,0xf0,0xf6,0x2b,0xba,0x02,0x25,0xd2,0x66,0xe7,0xa6,0x94,0x86,0xfe};
static const std::array<uint8_t, 16> CDCKey{0xa1,0x11,0x1c,0xf0,0xf6,0x2b,0xba,0x02,0x25,0xd2,0x66,0xe7,0xa6,0x94,0x86,0xfe};

View File

@@ -125,7 +125,7 @@ public:
LogsDBLogEntry entry() const {
auto value = _smaller->value();
return LogsDBLogEntry{key(), {value.data(), value.data() + value.size()}};
return LogsDBLogEntry{key(), {(const uint8_t*)value.data(), (const uint8_t*)value.data() + value.size()}};
}
void dropEntry() {
@@ -1576,7 +1576,7 @@ public:
// Mismatch in responses could be due to network issues we don't want to crash but we will ignore and retry
// Mismatch in internal state is asserted on.
if (unlikely(request->replicaId != resp.header.requestId)) {
if (unlikely(request->replicaId != resp.replicaId)) {
LOG_ERROR(_env, "Expected response from replica %s, got it from replica %s. Response: %s", request->replicaId, resp.header.requestId, resp);
continue;
}
@@ -1625,6 +1625,7 @@ public:
break;
case LogMessageKind::LOG_READ:
_catchupReader.proccessLogReadRequest(req.replicaId, req.header.requestId, req.requestContainer.getLogRead());
break;
case LogMessageKind::NEW_LEADER:
_leaderElection.proccessNewLeaderRequest(req.replicaId, req.header.requestId, req.requestContainer.getNewLeader());
break;

View File

@@ -327,7 +327,6 @@ std::ostream& operator<<(std::ostream& out, BlockServiceId crc);
struct LogIdx {
uint64_t u64;
constexpr LogIdx(): u64(0) {}
constexpr LogIdx(uint64_t idx): u64(idx) {

View File

@@ -52,6 +52,10 @@ public:
}
}
bool isClosed() {
return _size.load(std::memory_order_relaxed) & (1ull<<31);
}
// Tries to push all the elements. Returns how many were actually
// pushed. First element in `els` gets pushed first. Returns 0
// if the queue is closed.

View File

@@ -1,13 +1,15 @@
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <ostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <poll.h>
#include <unordered_map>
#include <vector>
#include "Assert.hpp"
@@ -20,8 +22,10 @@
#include "Msgs.hpp"
#include "Shard.hpp"
#include "Env.hpp"
#include "MsgsGen.hpp"
#include "MultiplexedChannel.hpp"
#include "ShardDB.hpp"
#include "ShardKey.hpp"
#include "CDCKey.hpp"
#include "SharedRocksDB.hpp"
#include "Shuckle.hpp"
@@ -53,6 +57,115 @@ const int LOG_ENTRIES_QUEUE_SIZE = 8192; // a few megabytes, should be quite a b
const int MAX_WRITES_AT_ONCE = 200; // say that each write is ~100bytes, this gives us 20KB of write
const int MAX_RECV_MSGS = 100;
enum class WriterQueueEntryKind :uint8_t {
LOGSDB_REQUEST = 1,
LOGSDB_RESPONSE = 2,
SHARD_LOG_ENTRY = 3,
};
std::ostream& operator<<(std::ostream& out, WriterQueueEntryKind kind) {
switch (kind) {
case WriterQueueEntryKind::LOGSDB_REQUEST:
out << "LOGSDB_REQUEST";
break;
case WriterQueueEntryKind::LOGSDB_RESPONSE:
out << "LOGSDB_RESPONSE";
break;
case WriterQueueEntryKind::SHARD_LOG_ENTRY:
out << "SHARD_LOG_ENTRY";
break;
default:
out << "Unknown WriterQueueEntryKind(" << (uint8_t)kind << ")";
break;
}
return out;
}
class ShardWriterRequest {
public:
ShardWriterRequest() { clear(); }
ShardWriterRequest(const ShardWriterRequest& other) {
_kind = other._kind;
_data = other._data;
}
ShardWriterRequest(ShardWriterRequest&& other) {
*this = std::move(other);
}
ShardWriterRequest& operator=(ShardWriterRequest&& other) {
_kind = other._kind;
_data = std::move(other._data);
other.clear();
return *this;
}
void clear() { _kind = (WriterQueueEntryKind)0; }
WriterQueueEntryKind kind() const { return _kind; }
LogsDBRequest& setLogsDBRequest() {
_kind = WriterQueueEntryKind::LOGSDB_REQUEST;
auto& x = _data.emplace<0>();
return x;
}
const LogsDBRequest& getLogsDBRequest() const {
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST);
return std::get<0>(_data);
}
LogsDBRequest&& moveLogsDBRequest() {
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST);
clear();
return std::move(std::get<0>(_data));
}
LogsDBResponse& setLogsDBResponse() {
_kind = WriterQueueEntryKind::LOGSDB_RESPONSE;
auto& x = _data.emplace<1>();
return x;
}
const LogsDBResponse& getLogsDBResponse() const {
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE);
return std::get<1>(_data);
}
LogsDBResponse&& moveLogsDBResponse() {
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE);
clear();
return std::move(std::get<1>(_data));
}
QueuedShardLogEntry& setQueuedShardLogEntry() {
_kind = WriterQueueEntryKind::SHARD_LOG_ENTRY;
auto& x = _data.emplace<2>();
return x;
}
const QueuedShardLogEntry& getQueuedShardLogEntry() const {
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY);
return std::get<2>(_data);
}
QueuedShardLogEntry&& moveQueuedShardLogEntry() {
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY);
clear();
return std::move(std::get<2>(_data));
}
private:
WriterQueueEntryKind _kind;
std::variant<LogsDBRequest, LogsDBResponse, QueuedShardLogEntry> _data;
};
struct ReplicaAddrsInfo {
std::array<sockaddr_in, 2> addrs;
};
bool operator==(sockaddr_in& l, sockaddr_in& r) {
return l.sin_addr.s_addr == r.sin_addr.s_addr && l.sin_port == r.sin_port;
}
struct ShardShared {
SharedRocksDB& sharedDB;
@@ -63,15 +176,15 @@ struct ShardShared {
std::array<struct pollfd, 2> socks;
std::array<Timings, maxShardMessageKind+1> timings;
std::array<ErrorCount, maxShardMessageKind+1> errors;
SPSC<QueuedShardLogEntry> logEntriesQueue;
SPSC<ShardWriterRequest> writerRequestsQueue;
std::atomic<double> logEntriesQueueSize;
std::array<std::atomic<double>, 2> receivedRequests; // how many requests we got at once from each socket
std::atomic<double> pulledWriteRequests; // how many requests we got from write queue
std::array<AddrsInfo,5> replicas;
std::mutex replicasLock;
std::shared_ptr<std::array<ReplicaAddrsInfo, LogsDB::REPLICA_COUNT>> replicas;
std::atomic<bool> isLeader;
ShardShared() = delete;
ShardShared(SharedRocksDB& sharedDB_, BlockServicesCacheDB& blockServicesCache_, ShardDB& shardDB_): sharedDB(sharedDB_), blockServicesCache(blockServicesCache_), shardDB(shardDB_), ips{0, 0}, ports{0, 0}, logEntriesQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) {
ShardShared(SharedRocksDB& sharedDB_, BlockServicesCacheDB& blockServicesCache_, ShardDB& shardDB_): sharedDB(sharedDB_), blockServicesCache(blockServicesCache_), shardDB(shardDB_), ips{0, 0}, ports{0, 0}, writerRequestsQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) {
for (ShardMessageKind kind : allShardMessageKind) {
timings[(int)kind] = Timings::Standard();
}
@@ -101,7 +214,7 @@ static bool bigResponse(ShardMessageKind kind) {
);
}
static void packResponse(
static void packShardResponse(
Env& env,
ShardShared& shared,
std::vector<char>& sendBuf,
@@ -111,7 +224,7 @@ static void packResponse(
ShardMessageKind kind,
Duration elapsed,
bool dropArtificially,
struct sockaddr_in* clientAddr,
const struct sockaddr_in* clientAddr,
int sockIx,
EggsError err,
const ShardRespContainer& resp
@@ -156,7 +269,7 @@ static void packResponse(
// -- we'll fix up the actual values later.
auto& hdr = sendHdrs.emplace_back();
hdr.msg_hdr = {
.msg_name = clientAddr,
.msg_name = (sockaddr_in*)clientAddr,
.msg_namelen = sizeof(*clientAddr),
.msg_iovlen = 1,
};
@@ -178,6 +291,7 @@ private:
// run data
AES128Key _expandedCDCKey;
AES128Key _expandedShardKey;
// recvmmsg data (one per socket, since we receive from both and send from both
// using some of the header data)
std::array<std::vector<char>, 2> _recvBuf;
@@ -188,7 +302,7 @@ private:
ShardReqContainer _reqContainer;
ShardRespContainer _respContainer;
// log entries buffers
std::vector<QueuedShardLogEntry> _logEntries;
std::vector<ShardWriterRequest> _logEntries;
// sendmmsg data
std::vector<char> _sendBuf;
std::array<std::vector<struct mmsghdr>, 2> _sendHdrs; // one per socket
@@ -215,6 +329,7 @@ public:
};
convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability);
convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability);
expandKey(ShardKey, _expandedShardKey);
_init();
}
@@ -281,7 +396,136 @@ private:
}
}
void _handleRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& reqBbuf) {
void _handleLogsDBResponse(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& buf) {
LOG_DEBUG(_env, "received LogsDBResponse from %s", *clientAddr);
auto replicaId = _getReplicaId(clientAddr);
if (replicaId == LogsDB::REPLICA_COUNT) {
LOG_DEBUG(_env, "We can't match this address to replica. Dropping");
return;
}
auto& resp = _logEntries.emplace_back().setLogsDBResponse();
resp.replicaId = replicaId;
try {
resp.header.unpack(buf);
} catch (const BincodeException& err) {
LOG_ERROR(_env, "Could not parse LogsDBResponse.header: %s", err.what());
_logEntries.pop_back();
return;
}
if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) {
LOG_DEBUG(_env, "artificially dropping LogsDBResponse %s", resp.header.requestId);
_logEntries.pop_back();
return;
}
try {
resp.responseContainer.unpack(buf, resp.header.kind);
} catch (const BincodeException& exc) {
LOG_ERROR(_env, "Could not parse LogsDBResponse.responseContainer of kind %s: %s", resp.header.kind, exc.what());
_logEntries.pop_back();
return;
}
if (unlikely(buf.remaining() < 8)) {
LOG_ERROR(_env, "Could not parse LogsDBResponse of kind %s from %s. Message signature is 8 bytes, only %s remaining", resp.header.kind, *clientAddr, buf.remaining());
_logEntries.pop_back();
return;
}
auto expectedMac = cbcmac(_expandedShardKey, buf.data, buf.cursor - buf.data);
BincodeFixedBytes<8> receivedMac;
buf.unpackFixedBytes<8>(receivedMac);
if (unlikely(expectedMac != receivedMac.data)) {
LOG_ERROR(_env, "Incorrect signature for LogsDBResponse %s from %s", resp.header.kind, *clientAddr);
_logEntries.pop_back();
return;
}
if (unlikely(buf.remaining())) {
LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBResponse %s from %s", buf.remaining(), resp.header.kind, *clientAddr);
_logEntries.pop_back();
}
LOG_DEBUG(_env, "Received response %s for requests id %s from replica id %s", resp.header.kind, resp.header.requestId, resp.replicaId);
}
void _handleLogsDBRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& buf) {
LOG_DEBUG(_env, "received LogsDBRequest from %s", *clientAddr);
auto replicaId = _getReplicaId(clientAddr);
if (replicaId == LogsDB::REPLICA_COUNT) {
LOG_DEBUG(_env, "We can't match this address to replica. Dropping");
return;
}
auto& req = _logEntries.emplace_back().setLogsDBRequest();
req.replicaId = replicaId;
try {
req.header.unpack(buf);
} catch (const BincodeException& err) {
LOG_ERROR(_env, "Could not parse LogsDBRequest.header: %s", err.what());
_logEntries.pop_back();
return;
}
if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) {
LOG_DEBUG(_env, "artificially dropping LogsDBRequest %s", req.header.requestId);
_logEntries.pop_back();
return;
}
try {
req.requestContainer.unpack(buf, req.header.kind);
} catch (const BincodeException& exc) {
LOG_ERROR(_env, "Could not parse LogsDBRequest.requestContainer of kind %s: %s", req.header.kind, exc.what());
_logEntries.pop_back();
return;
}
if (unlikely(buf.remaining() < 8)) {
LOG_ERROR(_env, "Could not parse LogsDBRequest of kind %s from %s, message signature is 8 bytes, only %s remaining", req.header.kind, *clientAddr, buf.remaining());
_logEntries.pop_back();
return;
}
auto expectedMac = cbcmac(_expandedShardKey, buf.data, buf.cursor - buf.data);
BincodeFixedBytes<8> receivedMac;
buf.unpackFixedBytes<8>(receivedMac);
if (unlikely(expectedMac != receivedMac.data)) {
LOG_ERROR(_env, "Incorrect signature for LogsDBRequest %s from %s", req.header.kind, *clientAddr);
_logEntries.pop_back();
return;
}
if (unlikely(buf.remaining())) {
LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBRequest %s from %s", buf.remaining(), req.header.kind, *clientAddr);
_logEntries.pop_back();
}
LOG_DEBUG(_env, "Received request %s with requests id %s from replica id %s", req.header.kind, req.header.requestId, req.replicaId);
}
uint8_t _getReplicaId(sockaddr_in* clientAddress) {
auto replicasPtr = _shared.replicas;
if (!replicasPtr) {
return LogsDB::REPLICA_COUNT;
}
for (ReplicaId replicaId = 0; replicaId.u8 < replicasPtr->size(); ++replicaId.u8) {
auto& replica = replicasPtr->at(replicaId.u8);
if (replica.addrs[0] == *clientAddress || replica.addrs[1] == *clientAddress) {
return replicaId.u8;
}
}
return LogsDB::REPLICA_COUNT;
}
void _handleShardRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& reqBbuf) {
LOG_DEBUG(_env, "received message from %s", *clientAddr);
// First, try to parse the header
@@ -325,13 +569,21 @@ private:
err = EggsError::MALFORMED_REQUEST;
}
// authenticate, if necessary
if (isPrivilegedRequestKind(reqHeader.kind)) {
auto expectedMac = cbcmac(_expandedCDCKey, reqBbuf.data, reqBbuf.cursor - reqBbuf.data);
BincodeFixedBytes<8> receivedMac;
reqBbuf.unpackFixedBytes<8>(receivedMac);
if (expectedMac != receivedMac.data) {
err = EggsError::NOT_AUTHORISED;
if (likely(err == NO_ERROR)) {
// authenticate, if necessary
if (isPrivilegedRequestKind(reqHeader.kind)) {
if (unlikely(reqBbuf.remaining() < 8)) {
LOG_ERROR(_env, "Could not parse request of kind %s from %s, message signature is 8 bytes, only %s remaining", reqHeader.kind, *clientAddr, reqBbuf.remaining());
RAISE_ALERT(_env, "could not parse request of kind %s from %s, will reply with error.", reqHeader.kind, *clientAddr);
err = EggsError::MALFORMED_REQUEST;
} else {
auto expectedMac = cbcmac(_expandedCDCKey, reqBbuf.data, reqBbuf.cursor - reqBbuf.data);
BincodeFixedBytes<8> receivedMac;
reqBbuf.unpackFixedBytes<8>(receivedMac);
if (expectedMac != receivedMac.data) {
err = EggsError::NOT_AUTHORISED;
}
}
}
}
@@ -348,7 +600,7 @@ private:
if (readOnlyShardReq(_reqContainer.kind())) {
err = _shared.shardDB.read(_reqContainer, _respContainer);
} else {
auto& entry = _logEntries.emplace_back();
auto& entry = _logEntries.emplace_back().setQueuedShardLogEntry();
entry.sockIx = sockIx;
entry.clientAddr = *clientAddr;
entry.receivedAt = t0;
@@ -365,8 +617,7 @@ private:
Duration elapsed = eggsNow() - t0;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
packResponse(_env, _shared, _sendBuf, _sendHdrs[sockIx], _sendVecs[sockIx], reqHeader.requestId, reqHeader.kind, elapsed, dropArtificially, clientAddr, sockIx, err, _respContainer);
packShardResponse(_env, _shared, _sendBuf, _sendHdrs[sockIx], _sendVecs[sockIx], reqHeader.requestId, reqHeader.kind, elapsed, dropArtificially, clientAddr, sockIx, err, _respContainer);
}
public:
@@ -407,8 +658,16 @@ public:
}
}
for (auto& msg : _channel.getProtocolMessages(LOG_RESP_PROTOCOL_VERSION)) {
_handleLogsDBResponse(msg.socketId, msg.clientAddr, msg.buf);
}
for (auto& msg : _channel.getProtocolMessages(LOG_REQ_PROTOCOL_VERSION)) {
_handleLogsDBRequest(msg.socketId, msg.clientAddr, msg.buf);
}
for (auto& msg : _channel.getProtocolMessages(SHARD_REQ_PROTOCOL_VERSION)) {
_handleRequest(msg.socketId, msg.clientAddr, msg.buf);
_handleShardRequest(msg.socketId, msg.clientAddr, msg.buf);
}
@@ -417,8 +676,8 @@ public:
size_t numLogEntries = _logEntries.size();
if (numLogEntries > 0) {
LOG_DEBUG(_env, "pushing %s log entries to writer", numLogEntries);
uint32_t pushed = _shared.logEntriesQueue.push(_logEntries);
_shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.logEntriesQueue.size()*0.05;
uint32_t pushed = _shared.writerRequestsQueue.push(_logEntries);
_shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.writerRequestsQueue.size()*0.05;
if (pushed < numLogEntries) {
LOG_INFO(_env, "tried to push %s elements to write queue, but pushed %s instead", numLogEntries, pushed);
}
@@ -452,15 +711,22 @@ public:
struct ShardWriter : Loop {
private:
ShardShared& _shared;
AES128Key _expandedShardKey;
uint64_t _currentLogIndex;
ShardRespContainer _respContainer;
std::vector<ShardWriterRequest> _requests;
std::vector<QueuedShardLogEntry> _logEntries;
std::vector<LogsDBRequest> _logsdbRequests;
std::vector<LogsDBResponse> _logsdbResponses;
std::vector<LogsDBRequest *> outRequests;
std::vector<LogsDBResponse> outResponses;
std::unique_ptr<LogsDB> _logsDB;
const bool _dontWaitForReplication;
std::vector<LogsDBRequest> _logsDBRequests;
std::vector<LogsDBResponse> _logsDBResponses;
std::vector<LogsDBRequest *> _logsDBOutRequests;
std::vector<LogsDBResponse> _logsDBOutResponses;
std::unordered_map<uint64_t, QueuedShardLogEntry> _inFlightEntries;
std::vector<QueuedShardLogEntry> _outgoingLogEntries;
std::shared_ptr<std::array<ReplicaAddrsInfo, LogsDB::REPLICA_COUNT>> _replicaInfo;
// sendmmsg data (one per socket)
std::vector<char> _sendBuf;
@@ -472,17 +738,19 @@ private:
uint64_t _outgoingPacketDropProbability; // probability * 10,000
virtual void sendStop() override {
_shared.logEntriesQueue.close();
_shared.writerRequestsQueue.close();
}
public:
ShardWriter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) :
Loop(logger, xmon, "writer"),
_shared(shared),
_dontWaitForReplication(options.dontWaitForReplication),
_packetDropRand(eggsNow().ns),
_incomingPacketDropProbability(0),
_outgoingPacketDropProbability(0)
{
expandKey(ShardKey, _expandedShardKey);
_currentLogIndex = _shared.shardDB.lastAppliedLogEntry();
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
if (prob != 0.0) {
@@ -493,109 +761,324 @@ public:
};
convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability);
convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability);
_logEntries.reserve(MAX_WRITES_AT_ONCE);
_requests.reserve(MAX_WRITES_AT_ONCE);
if (options.writeToLogsDB) {
_logsDB.reset(new LogsDB(_env,_shared.sharedDB,shrid.replicaId(), _currentLogIndex, options.dontWaitForReplication, options.dontDoReplication, options.forceLeader, options.avoidBeingLeader, options.initialStart, options.initialStart ? _currentLogIndex : 0));
_logsDB->processIncomingMessages(_logsdbRequests, _logsdbResponses);
_logsDB->processIncomingMessages(_logsDBRequests, _logsDBResponses);
_shared.isLeader.store(_logsDB->isLeader(), std::memory_order_relaxed);
} else {
ALWAYS_ASSERT(shrid.replicaId() == 0);
_shared.isLeader.store(true, std::memory_order_relaxed);
}
}
virtual ~ShardWriter() = default;
virtual void step() override {
_logEntries.clear();
_sendBuf.clear();
for (int i = 0; i < _sendHdrs.size(); i++) {
_sendHdrs[i].clear();
_sendVecs[i].clear();
}
uint32_t pulled = _shared.logEntriesQueue.pull(_logEntries, MAX_WRITES_AT_ONCE);
if (likely(pulled > 0)) {
LOG_DEBUG(_env, "pulled %s requests from write queue", pulled);
_shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05;
} else {
// queue is closed, stop
stop();
}
void logsDBStep() {
LOG_DEBUG(_env, "Calling LogsDB processIncomingMessages with %s requests and %s responses", _logsDBRequests.size(), _logsDBResponses.size());
_logsDB->processIncomingMessages(_logsDBRequests,_logsDBResponses);
_shared.isLeader.store(_logsDB->isLeader(), std::memory_order_relaxed);
std::vector<LogsDBLogEntry> entries;
if (_logsDB) {
// If we are leader write any outstanding entries
std::vector<LogsDBLogEntry> logsDBEntries;
if (!_logsDB->isLeader()) {
//TODO: we could notify clients we are no longer leader
if (_logEntries.size() > 0) {
LOG_INFO(_env, "Received %s shard write requests but we are not log leader. dropping", _logEntries.size());
_logEntries.clear();
}
if (_inFlightEntries.size() > 0) {
LOG_INFO(_env, "There are %s in flight shard write requests but we are no longer log leader. dropping", _inFlightEntries.size());
_inFlightEntries.clear();
}
} else if (_logEntries.size()) {
logsDBEntries.reserve(_logEntries.size());
std::vector<uint8_t> data;
data.resize(MAX_UDP_MTU);
entries.reserve(_logEntries.size());
for (auto& logEntry : _logEntries) {
entries.emplace_back();
auto& entry = entries.back();
logsDBEntries.reserve(_requests.size());
LogIdx expectedIdx = _currentLogIndex + _inFlightEntries.size();
for (auto& queuedLogEntry : _logEntries) {
auto& logsDBEntry = logsDBEntries.emplace_back();;
queuedLogEntry.logEntry.idx = ++expectedIdx;
BincodeBuf buf((char*)&data[0], MAX_UDP_MTU);
logEntry.logEntry.pack(buf);
entry.value.assign(buf.data, buf.cursor);
queuedLogEntry.logEntry.pack(buf);
logsDBEntry.value.assign(buf.data, buf.cursor);
}
auto err = _logsDB->appendEntries(entries);
auto err = _logsDB->appendEntries(logsDBEntries);
ALWAYS_ASSERT(err == NO_ERROR);
_logsDB->processIncomingMessages(_logsdbRequests, _logsdbResponses);
_logsDB->getOutgoingMessages(outRequests, outResponses);
entries.clear();
ALWAYS_ASSERT(_logEntries.size() == logsDBEntries.size());
for (size_t i = 0; i < _logEntries.size(); ++i) {
if (logsDBEntries[i].idx == 0) {
// if we don't wait for replication the window gets cleared immediately
ALWAYS_ASSERT(!_dontWaitForReplication);
ALWAYS_ASSERT(_inFlightEntries.size() == LogsDB::IN_FLIGHT_APPEND_WINDOW);
LOG_INFO(_env, "Appended %s out of %s shard write requests. Log in flight windows is full. Dropping other entries", i, _logEntries.size());
break;
}
ALWAYS_ASSERT(logsDBEntries[i].idx == _logEntries[i].logEntry.idx);
_inFlightEntries.emplace(_logEntries[i].logEntry.idx.u64, std::move(_logEntries[i]));
}
logsDBEntries.clear();
_logsDB->readEntries(entries);
ALWAYS_ASSERT(entries.size() == _logEntries.size());
if (_dontWaitForReplication) {
// usually the state machine is moved by responses if we don't expect any we move it manually
_logsDBRequests.clear();
_logsDBResponses.clear();
_logsDB->processIncomingMessages(_logsDBRequests, _logsDBResponses);
}
}
size_t entriesIdx = 0;
for (auto& logEntry : _logEntries) {
// Log if not active is not chaty but it's messages are higher priority as they make us progress state under high load.
// We want to have priority when sending out
_logsDB->getOutgoingMessages(_logsDBOutRequests, _logsDBOutResponses);
for (auto& response : _logsDBOutResponses) {
packLogsDBResponse(response);
}
for (auto request : _logsDBOutRequests) {
packLogsDBRequest(*request);
}
_logsDB->readEntries(logsDBEntries);
_outgoingLogEntries.reserve(logsDBEntries.size());
if (_dontWaitForReplication && _logsDB->isLeader()) {
ALWAYS_ASSERT(_inFlightEntries.size() == logsDBEntries.size());
}
for (auto& logsDBEntry : logsDBEntries) {
++_currentLogIndex;
ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx);
ALWAYS_ASSERT(logsDBEntry.value.size() > 0);
BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size());
ShardLogEntry shardEntry;
shardEntry.unpack(buf);
ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx);
auto it = _inFlightEntries.find(shardEntry.idx.u64);
if (_dontWaitForReplication && _logsDB->isLeader()) {
ALWAYS_ASSERT(it != _inFlightEntries.end());
ALWAYS_ASSERT(shardEntry == it->second.logEntry);
}
auto err = _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _respContainer);
if (it != _inFlightEntries.end()) {
auto& logEntry = _outgoingLogEntries.emplace_back(std::move(it->second));
_inFlightEntries.erase(it);
ALWAYS_ASSERT(shardEntry == logEntry.logEntry);
if (likely(logEntry.requestId)) {
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr);
} else {
LOG_DEBUG(_env, "applying request-less log entry");
}
if (likely(logEntry.requestId)) {
Duration elapsed = eggsNow() - logEntry.receivedAt;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
packShardResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer);
} else if (unlikely(err != NO_ERROR)) {
RAISE_ALERT(_env, "could not apply request-less log entry: %s", err);
}
}
}
_shared.shardDB.flush(true);
_logsDB->flush(true);
sendMessages();
}
void noLogsDbStep() {
for (auto& logEntry : _logEntries) {
if (likely(logEntry.requestId)) {
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr);
} else {
LOG_DEBUG(_env, "applying request-less log entry");
}
_currentLogIndex++;
EggsError err = NO_ERROR;
if (_logsDB) {
auto& logsdbEntry = entries[entriesIdx++];
ALWAYS_ASSERT(_currentLogIndex == logsdbEntry.idx);
ALWAYS_ASSERT(logsdbEntry.value.size() > 0);
BincodeBuf buf((char*)&logsdbEntry.value.front(), logsdbEntry.value.size());
ShardLogEntry shardEntry;
shardEntry.unpack(buf);
ALWAYS_ASSERT(shardEntry == logEntry.logEntry);
err = _shared.shardDB.applyLogEntry(logsdbEntry.idx.u64, shardEntry, _respContainer);
} else {
err = _shared.shardDB.applyLogEntry(_currentLogIndex, logEntry.logEntry, _respContainer);
}
auto err = _shared.shardDB.applyLogEntry(++_currentLogIndex, logEntry.logEntry, _respContainer);
if (likely(logEntry.requestId)) {
Duration elapsed = eggsNow() - logEntry.receivedAt;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
packResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer);
packShardResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer);
} else if (unlikely(err != NO_ERROR)) {
RAISE_ALERT(_env, "could not apply request-less log entry: %s", err);
}
}
if (pulled > 0) {
LOG_DEBUG(_env, "flushing and sending %s writes", pulled);
if (_requests.size() > 0) {
LOG_DEBUG(_env, "flushing and sending %s writes", _requests.size());
_shared.shardDB.flush(true);
// important to send all of them after the flush! otherwise it's not durable yet
for (int i = 0; i < _sendHdrs.size(); i++) {
if (_sendHdrs[i].size() == 0) { continue; }
LOG_DEBUG(_env, "sending %s write responses to socket %s", _sendHdrs[i].size(), i);
for (int j = 0; j < _sendHdrs[i].size(); j++) {
auto& vec = _sendVecs[i][j];
vec.iov_base = &_sendBuf[(size_t)vec.iov_base];
auto& hdr = _sendHdrs[i][j];
hdr.msg_hdr.msg_iov = &vec;
}
int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0);
if (unlikely(ret < 0)) {
// we get this when nf drops packets
if (errno != EPERM) {
throw SYSCALL_EXCEPTION("sendto");
} else {
LOG_INFO(_env, "dropping %s responses because of EPERM", _sendHdrs[i].size());
}
} else if (unlikely(ret < _sendHdrs[i].size())) {
LOG_INFO(_env, "dropping %s out of %s requests since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size());
}
sendMessages();
}
}
ReplicaAddrsInfo* addressFromReplicaId(ReplicaId id) {
if (!_replicaInfo) {
return nullptr;
}
auto& replicaInfo = (*_replicaInfo)[id.u8];
if (replicaInfo.addrs[0].sin_addr.s_addr == 0) {
return nullptr;
}
return &replicaInfo;
}
void packLogsDBResponse(LogsDBResponse& response) {
auto addrInfoPtr = addressFromReplicaId(response.replicaId);
if (unlikely(addrInfoPtr == nullptr)) {
LOG_DEBUG(_env, "No information for replica id %s. dropping response", response.replicaId);
return;
}
auto& addrInfo = *addrInfoPtr;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
if (unlikely(dropArtificially)) {
LOG_DEBUG(_env, "artificially dropping response %s", response.header.requestId);
return;
}
// pack into sendBuf
size_t sendBufBegin = _sendBuf.size();
_sendBuf.resize(sendBufBegin + MAX_UDP_MTU);
BincodeBuf respBbuf(&_sendBuf[sendBufBegin], MAX_UDP_MTU);
response.header.pack(respBbuf);
response.responseContainer.pack(respBbuf);
respBbuf.packFixedBytes<8>({cbcmac(_expandedShardKey, respBbuf.data, respBbuf.cursor - respBbuf.data)});
_sendBuf.resize(sendBufBegin + respBbuf.len());
auto now = eggsNow(); // randomly pick one of the shard addrs and one of our sockets
int whichReplicaAddr = now.ns & !!addrInfo.addrs[1].sin_port;
int whichSock = (now.ns>>1) & !!_shared.ips[1];
LOG_DEBUG(_env, "will send response for req id %s kind %s to %s", response.header.requestId, response.header.kind, addrInfo.addrs[whichReplicaAddr]);
// Prepare sendmmsg stuff. The vectors might be resized by the
// time we get to sending this, so store references when we must
// -- we'll fix up the actual values later.
auto& hdr = _sendHdrs[whichSock].emplace_back();
hdr.msg_hdr = {
.msg_name = (sockaddr_in*)&addrInfo.addrs[whichReplicaAddr],
.msg_namelen = sizeof(addrInfo.addrs[whichReplicaAddr]),
.msg_iovlen = 1,
};
hdr.msg_len = respBbuf.len();
auto& vec = _sendVecs[whichSock].emplace_back();
vec.iov_base = (void*)sendBufBegin;
vec.iov_len = respBbuf.len();
}
void packLogsDBRequest(LogsDBRequest& request) {
auto addrInfoPtr = addressFromReplicaId(request.replicaId);
if (unlikely(addrInfoPtr == nullptr)) {
LOG_DEBUG(_env, "No information for replica id %s. dropping request", request.replicaId);
return;
}
auto& addrInfo = *addrInfoPtr;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
if (unlikely(dropArtificially)) {
LOG_DEBUG(_env, "artificially dropping request %s", request.header.requestId);
return;
}
// pack into sendBuf
size_t sendBufBegin = _sendBuf.size();
_sendBuf.resize(sendBufBegin + MAX_UDP_MTU);
BincodeBuf respBbuf(&_sendBuf[sendBufBegin], MAX_UDP_MTU);
request.header.pack(respBbuf);
request.requestContainer.pack(respBbuf);
respBbuf.packFixedBytes<8>({cbcmac(_expandedShardKey, respBbuf.data, respBbuf.cursor - respBbuf.data)});
_sendBuf.resize(sendBufBegin + respBbuf.len());
request.sentTime = eggsNow(); // randomly pick one of the shard addrs and one of our sockets
int whichReplicaAddr = request.sentTime.ns & !!addrInfo.addrs[1].sin_port;
int whichSock = (request.sentTime.ns>>1) & !!_shared.ips[1];
LOG_DEBUG(_env, "will send request for req id %s kind %s to %s", request.header.requestId, request.header.kind, addrInfo.addrs[whichReplicaAddr]);
// Prepare sendmmsg stuff. The vectors might be resized by the
// time we get to sending this, so store references when we must
// -- we'll fix up the actual values later.
auto& hdr = _sendHdrs[whichSock].emplace_back();
hdr.msg_hdr = {
.msg_name = (sockaddr_in*)&addrInfo.addrs[whichReplicaAddr],
.msg_namelen = sizeof(addrInfo.addrs[whichReplicaAddr]),
.msg_iovlen = 1,
};
hdr.msg_len = respBbuf.len();
auto& vec = _sendVecs[whichSock].emplace_back();
vec.iov_base = (void*)sendBufBegin;
vec.iov_len = respBbuf.len();
}
void sendMessages() {
for (int i = 0; i < _sendHdrs.size(); i++) {
if (_sendHdrs[i].size() == 0) { continue; }
LOG_DEBUG(_env, "sending %s messages to socket %s", _sendHdrs[i].size(), i);
for (int j = 0; j < _sendHdrs[i].size(); j++) {
auto& vec = _sendVecs[i][j];
vec.iov_base = &_sendBuf[(size_t)vec.iov_base];
auto& hdr = _sendHdrs[i][j];
hdr.msg_hdr.msg_iov = &vec;
}
int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0);
if (unlikely(ret < 0)) {
// we get this when nf drops packets
if (errno != EPERM) {
throw SYSCALL_EXCEPTION("sendto");
} else {
LOG_INFO(_env, "dropping %s messages because of EPERM", _sendHdrs[i].size());
}
} else if (unlikely(ret < _sendHdrs[i].size())) {
LOG_INFO(_env, "dropping %s out of %s messages since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size());
}
}
}
virtual void step() override {
_requests.clear();
_logsDBRequests.clear();
_logsDBResponses.clear();
_logsDBOutRequests.clear();
_logsDBOutResponses.clear();
_logEntries.clear();
_outgoingLogEntries.clear();
_sendBuf.clear();
for (int i = 0; i < _sendHdrs.size(); i++) {
_sendHdrs[i].clear();
_sendVecs[i].clear();
}
_replicaInfo = _shared.replicas;
uint32_t pulled = _shared.writerRequestsQueue.pull(_requests, MAX_WRITES_AT_ONCE, _logsDB ? _logsDB->getNextTimeout() : -1);
if (likely(pulled > 0)) {
LOG_DEBUG(_env, "pulled %s requests from write queue", pulled);
_shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05;
}
if (unlikely(_shared.writerRequestsQueue.isClosed())) {
// queue is closed, stop
stop();
return;
}
for(auto& request : _requests) {
switch (request.kind()) {
case WriterQueueEntryKind::LOGSDB_REQUEST:
_logsDBRequests.emplace_back(request.moveLogsDBRequest());
break;
case WriterQueueEntryKind::LOGSDB_RESPONSE:
_logsDBResponses.emplace_back(request.moveLogsDBResponse());
break;
case WriterQueueEntryKind::SHARD_LOG_ENTRY:
_logEntries.emplace_back(request.moveQueuedShardLogEntry());
break;
}
}
if (_logsDB) {
logsDBStep();
} else {
noLogsDbStep();
}
}
};
@@ -612,7 +1095,7 @@ private:
AddrsInfo _info;
bool _infoLoaded;
bool _registerCompleted;
uint8_t _leaderReplicaId;
std::array<AddrsInfo, 5> _replicas;
public:
ShardRegisterer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) :
PeriodicLoop(logger, xmon, "registerer", {1_sec, 1, 1_mins, 0.1}),
@@ -622,8 +1105,7 @@ public:
_shucklePort(options.shucklePort),
_hasSecondIp(options.ipPorts[1].port != 0),
_infoLoaded(false),
_registerCompleted(false),
_leaderReplicaId(options.leaderReplicaId)
_registerCompleted(false)
{}
virtual ~ShardRegisterer() = default;
@@ -668,14 +1150,28 @@ public:
_env.updateAlert(_alert, "AddrsInfo in shuckle: %s , not matching local AddrsInfo: %s", replicas[_shrid.replicaId().u8], _info);
return false;
}
{
std::lock_guard guard(_shared.replicasLock);
_shared.replicas = replicas;
if (_replicas != replicas) {
_replicas = replicas;
auto replicaUpdatePtr = std::make_shared<std::array<ReplicaAddrsInfo, LogsDB::REPLICA_COUNT>>();
auto& replicaUpdate = *replicaUpdatePtr;
for (size_t i = 0; i < _replicas.size(); ++i) {
uint32_t ip;
memcpy(&ip, _replicas[i].ip1.data.data(), _replicas[i].ip1.data.size());
replicaUpdate[i].addrs[0].sin_family = AF_INET;
replicaUpdate[i].addrs[0].sin_addr.s_addr = ip;
memcpy(&ip, _replicas[i].ip2.data.data(), _replicas[i].ip2.data.size());
replicaUpdate[i].addrs[1].sin_family = AF_INET;
replicaUpdate[i].addrs[1].sin_addr.s_addr = ip;
replicaUpdate[i].addrs[0].sin_port = htons(_replicas[i].port1);
replicaUpdate[i].addrs[1].sin_port= htons(_replicas[i].port2);
}
std::atomic_exchange(&_shared.replicas, replicaUpdatePtr);
}
}
LOG_INFO(_env, "Registering ourselves (shard %s, %s) with shuckle", _shrid, _info);
err = registerShardReplica(_shuckleHost, _shucklePort, 10_sec, _shrid, _shrid.replicaId() == _leaderReplicaId, _info);
err = registerShardReplica(_shuckleHost, _shucklePort, 10_sec, _shrid, _shared.isLeader.load(std::memory_order_relaxed), _info);
if (!err.empty()) {
_env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", err);
return false;

View File

@@ -123,6 +123,7 @@ static int pickMtu(uint16_t mtu) {
void ShardLogEntry::pack(BincodeBuf& buf) const {
buf.packScalar<uint32_t>(SHARD_LOG_PROTOCOL_VERSION);
idx.pack(buf);
time.pack(buf);
buf.packScalar<uint16_t>((uint16_t)body.kind());
body.pack(buf);
@@ -131,6 +132,7 @@ void ShardLogEntry::pack(BincodeBuf& buf) const {
void ShardLogEntry::unpack(BincodeBuf& buf) {
uint32_t protocol = buf.unpackScalar<uint32_t>();
ALWAYS_ASSERT(protocol == SHARD_LOG_PROTOCOL_VERSION);
idx.unpack(buf);
time.unpack(buf);
ShardLogEntryKind kind = (ShardLogEntryKind)buf.unpackScalar<uint16_t>();
body.unpack(buf, kind);

View File

@@ -12,6 +12,7 @@
struct ShardLogEntry {
LogIdx idx;
EggsTime time;
ShardLogEntryContainer body;

7
cpp/shard/ShardKey.hpp Normal file
View File

@@ -0,0 +1,7 @@
#pragma once
#include <array>
#include <cstdint>
static const std::array<uint8_t, 16> ShardKey{0x0d, 0x57, 0x82, 0xed, 0x77, 0x90, 0xd6, 0x69, 0x0f, 0x7d, 0x22, 0x44, 0x19, 0x5a, 0xff, 0x0b};

View File

@@ -39,6 +39,7 @@ func main() {
xmon := flag.String("xmon", "", "")
shuckleScriptsJs := flag.String("shuckle-scripts-js", "", "")
noFuse := flag.Bool("no-fuse", false, "")
useLogsDB := flag.Bool("use-logsdb", false, "Spin up replicas for shard. 0 is leader, rest followers")
flag.Parse()
noRunawayArgs()
@@ -189,25 +190,40 @@ func main() {
procs.StartCDC(log, *repoDir, &opts)
}
replicaCount := uint8(1)
if *useLogsDB {
replicaCount = 5
}
// Start shards
for i := 0; i < 256; i++ {
shid := msgs.ShardId(i)
opts := managedprocess.ShardOpts{
Exe: cppExes.ShardExe,
Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)),
LogLevel: level,
Shid: shid,
Valgrind: *buildType == "valgrind",
ShuckleAddress: shuckleAddress,
Perf: *profile,
Xmon: *xmon,
for r := uint8(0); r < replicaCount; r++ {
shrid := msgs.MakeShardReplicaId(msgs.ShardId(i), msgs.ReplicaId(r))
opts := managedprocess.ShardOpts{
Exe: cppExes.ShardExe,
Shrid: shrid,
Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)),
LogLevel: level,
Valgrind: *buildType == "valgrind",
ShuckleAddress: shuckleAddress,
Perf: *profile,
Xmon: *xmon,
UseLogsDB: "",
}
if *useLogsDB {
opts.Dir = path.Join(*dataDir, fmt.Sprintf("shard_%03d_%d", i, r))
if r == 0 {
opts.UseLogsDB = "LEADER"
} else {
opts.UseLogsDB = "FOLLOWER"
}
}
if *startingPort != 0 {
opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*startingPort)+1+uint16(i)+uint16(r)*256)
} else {
opts.Addr1 = "127.0.0.1:0"
}
procs.StartShard(log, *repoDir, &opts)
}
if *startingPort != 0 {
opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*startingPort)+1+uint16(i))
} else {
opts.Addr1 = "127.0.0.1:0"
}
procs.StartShard(log, *repoDir, &opts)
}
fmt.Printf("waiting for shards/cdc for %v...\n", waitShuckleFor)

View File

@@ -853,6 +853,7 @@ func main() {
blockServiceKiller := flag.Bool("block-service-killer", false, "Go around killing block services to stimulate paths recovering from that.")
race := flag.Bool("race", false, "Go race detector")
shuckleBeaconPort := flag.Uint("shuckle-beacon-port", 0, "")
useLogsDB := flag.Bool("use-logsdb", false, "Spin up replicas for shard. 0 is leader, rest followers")
flag.Var(&overrides, "cfg", "Config overrides")
flag.Parse()
noRunawayArgs()
@@ -1106,9 +1107,11 @@ func main() {
}
// Start CDC
cdcOpts := &managedprocess.CDCOpts{
Exe: cppExes.CDCExe,
Dir: path.Join(*dataDir, "cdc"),
ReplicaId: msgs.ReplicaId(0),
LogLevel: level,
Valgrind: *buildType == "valgrind",
Perf: *profile,
@@ -1122,25 +1125,41 @@ func main() {
}
procs.StartCDC(log, *repoDir, cdcOpts)
replicaCount := uint8(1)
if *useLogsDB {
replicaCount = 5
}
// Start shards
numShards := 256
for i := 0; i < numShards; i++ {
shid := msgs.ShardId(i)
shopts := managedprocess.ShardOpts{
Exe: cppExes.ShardExe,
Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)),
LogLevel: level,
Shid: shid,
Valgrind: *buildType == "valgrind",
Perf: *profile,
IncomingPacketDrop: *incomingPacketDrop,
OutgoingPacketDrop: *outgoingPacketDrop,
ShuckleAddress: shuckleAddress,
Addr1: "127.0.0.1:0",
Addr2: "127.0.0.1:0",
TransientDeadlineInterval: &testTransientDeadlineInterval,
for r := uint8(0); r < replicaCount; r++ {
shrid := msgs.MakeShardReplicaId(msgs.ShardId(i), msgs.ReplicaId(r))
shopts := managedprocess.ShardOpts{
Exe: cppExes.ShardExe,
Dir: path.Join(*dataDir, fmt.Sprintf("shard_%03d", i)),
LogLevel: level,
Shrid: shrid,
Valgrind: *buildType == "valgrind",
Perf: *profile,
IncomingPacketDrop: *incomingPacketDrop,
OutgoingPacketDrop: *outgoingPacketDrop,
ShuckleAddress: shuckleAddress,
Addr1: "127.0.0.1:0",
Addr2: "127.0.0.1:0",
TransientDeadlineInterval: &testTransientDeadlineInterval,
UseLogsDB: "",
}
if *useLogsDB {
shopts.Dir = path.Join(*dataDir, fmt.Sprintf("shard_%03d_%d", i, r))
if r == 0 {
shopts.UseLogsDB = "LEADER"
} else {
shopts.UseLogsDB = "FOLLOWER"
}
}
procs.StartShard(log, *repoDir, &shopts)
}
procs.StartShard(log, *repoDir, &shopts)
}
// now wait for shards/cdc

View File

@@ -461,8 +461,8 @@ func BuildGoExes(ll *lib.Logger, repoDir string, race bool) *GoExes {
type ShardOpts struct {
Exe string
Dir string
Shrid msgs.ShardReplicaId
LogLevel lib.LogLevel
Shid msgs.ShardId
Valgrind bool
Perf bool
IncomingPacketDrop float64
@@ -472,6 +472,7 @@ type ShardOpts struct {
Addr2 string
TransientDeadlineInterval *time.Duration
Xmon string
UseLogsDB string
}
func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts *ShardOpts) {
@@ -495,6 +496,9 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts *
if opts.Xmon != "" {
args = append(args, "-xmon", opts.Xmon)
}
if opts.UseLogsDB != "" {
args = append(args, "-use-logsdb", opts.UseLogsDB)
}
switch opts.LogLevel {
case lib.TRACE:
args = append(args, "-log-level", "trace")
@@ -507,11 +511,12 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts *
}
args = append(args,
opts.Dir,
fmt.Sprintf("%d", int(opts.Shid)),
fmt.Sprintf("%d", int(opts.Shrid.Shard())),
fmt.Sprintf("%d", int(opts.Shrid.Replica())),
)
cppDir := cppDir(repoDir)
mpArgs := ManagedProcessArgs{
Name: fmt.Sprintf("shard %v", opts.Shid),
Name: fmt.Sprintf("shard %v", opts.Shrid),
Exe: opts.Exe,
Args: args,
StdoutFile: path.Join(opts.Dir, "stdout"),
@@ -553,6 +558,7 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts *
type CDCOpts struct {
Exe string
Dir string
ReplicaId msgs.ReplicaId
LogLevel lib.LogLevel
Valgrind bool
Perf bool
@@ -592,10 +598,10 @@ func (procs *ManagedProcesses) StartCDC(ll *lib.Logger, repoDir string, opts *CD
case lib.ERROR:
args = append(args, "-log-level", "error")
}
args = append(args, opts.Dir)
args = append(args, opts.Dir, fmt.Sprintf("%d", int(opts.ReplicaId)))
cppDir := cppDir(repoDir)
mpArgs := ManagedProcessArgs{
Name: "cdc",
Name: fmt.Sprintf("cdc %v", opts.ReplicaId),
Exe: opts.Exe,
Args: args,
StdoutFile: path.Join(opts.Dir, "stdout"),

View File

@@ -9,6 +9,7 @@ from common import *
parser = argparse.ArgumentParser()
parser.add_argument('--short', action='store_true')
parser.add_argument('--docker', action='store_true')
parser.add_argument('--logsdb', action='store_true')
args = parser.parse_args()
script_dir = os.path.dirname(os.path.realpath(__file__))
@@ -39,16 +40,17 @@ else:
bold_print('integration tests')
short = ['-short'] if args.short else []
logsdb = ['-use-logsdb'] if args.logsdb else []
# -block-service-killer does not work with FUSE driver (the duplicated FDs
# of the child processes confuse the FUSE driver).
tests = [
['./go/eggstests/eggstests', '-binaries-dir', build_sanitized, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-filter', fuse_tests, '-outgoing-packet-drop', '0.02'] + short,
['./go/eggstests/eggstests', '-binaries-dir', build_release, '-preserve-data-dir', '-verbose', '-block-service-killer', '-filter', 'direct', '-repo-dir', '.', '-tmp-dir', '.'] + short,
['./go/eggstests/eggstests', '-binaries-dir', build_sanitized, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-filter', fuse_tests, '-outgoing-packet-drop', '0.02'] + short + logsdb,
['./go/eggstests/eggstests', '-binaries-dir', build_release, '-preserve-data-dir', '-verbose', '-block-service-killer', '-filter', 'direct', '-repo-dir', '.', '-tmp-dir', '.'] + short + logsdb,
]
if not args.short:
# valgrind is super slow, it still surfaced bugs in the past but run the short
# versions only in the long tests.
tests.append(['./go/eggstests/eggstests', '-binaries-dir', build_valgrind, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-short', '-filter', fuse_tests])
tests.append(['./go/eggstests/eggstests', '-binaries-dir', build_valgrind, '-verbose', '-repo-dir', '.', '-tmp-dir', '.', '-short', '-filter', fuse_tests] + logsdb)
# we need three free ports, we get them here upfront rather than in shuckle to reduce
# the chance of races -- if we got it from the integration tests it'll be while
# tons of things are started in another integration test