mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2026-05-11 23:02:27 -05:00
1502 lines
62 KiB
C++
1502 lines
62 KiB
C++
#include <atomic>
|
|
#include <cstdint>
|
|
#include <memory>
|
|
#include <netinet/in.h>
|
|
#include <netinet/ip.h>
|
|
#include <ostream>
|
|
#include <sys/socket.h>
|
|
#include <arpa/inet.h>
|
|
#include <sys/ioctl.h>
|
|
#include <unistd.h>
|
|
#include <poll.h>
|
|
#include <unordered_map>
|
|
#include <vector>
|
|
|
|
#include "Assert.hpp"
|
|
#include "Bincode.hpp"
|
|
#include "BlockServicesCacheDB.hpp"
|
|
#include "Common.hpp"
|
|
#include "Crypto.hpp"
|
|
#include "Exception.hpp"
|
|
#include "LogsDB.hpp"
|
|
#include "Msgs.hpp"
|
|
#include "Shard.hpp"
|
|
#include "Env.hpp"
|
|
#include "MsgsGen.hpp"
|
|
#include "MultiplexedChannel.hpp"
|
|
#include "ShardDB.hpp"
|
|
#include "ShardKey.hpp"
|
|
#include "CDCKey.hpp"
|
|
#include "SharedRocksDB.hpp"
|
|
#include "Shuckle.hpp"
|
|
#include "Time.hpp"
|
|
#include "Time.hpp"
|
|
#include "wyhash.h"
|
|
#include "Xmon.hpp"
|
|
#include "Timings.hpp"
|
|
#include "ErrorCount.hpp"
|
|
#include "PeriodicLoop.hpp"
|
|
#include "Metrics.hpp"
|
|
#include "Loop.hpp"
|
|
#include "SPSC.hpp"
|
|
|
|
struct QueuedShardLogEntry {
|
|
ShardLogEntry logEntry;
|
|
// if requestId == 0, the rest is all garbage -- we don't need it.
|
|
// this is for log entries that are not generated by users (e.g.
|
|
// block service updates).
|
|
uint64_t requestId;
|
|
EggsTime receivedAt;
|
|
struct sockaddr_in clientAddr;
|
|
int sockIx; // which sock to use to reply
|
|
ShardMessageKind requestKind;
|
|
};
|
|
|
|
// TODO make options
|
|
const int LOG_ENTRIES_QUEUE_SIZE = 8192; // a few megabytes, should be quite a bit bigger than the below
|
|
const int MAX_WRITES_AT_ONCE = 200; // say that each write is ~100bytes, this gives us 20KB of write
|
|
const int MAX_RECV_MSGS = 100;
|
|
|
|
enum class WriterQueueEntryKind :uint8_t {
|
|
LOGSDB_REQUEST = 1,
|
|
LOGSDB_RESPONSE = 2,
|
|
SHARD_LOG_ENTRY = 3,
|
|
};
|
|
|
|
std::ostream& operator<<(std::ostream& out, WriterQueueEntryKind kind) {
|
|
switch (kind) {
|
|
case WriterQueueEntryKind::LOGSDB_REQUEST:
|
|
out << "LOGSDB_REQUEST";
|
|
break;
|
|
case WriterQueueEntryKind::LOGSDB_RESPONSE:
|
|
out << "LOGSDB_RESPONSE";
|
|
break;
|
|
case WriterQueueEntryKind::SHARD_LOG_ENTRY:
|
|
out << "SHARD_LOG_ENTRY";
|
|
break;
|
|
default:
|
|
out << "Unknown WriterQueueEntryKind(" << (uint8_t)kind << ")";
|
|
break;
|
|
}
|
|
return out;
|
|
}
|
|
class ShardWriterRequest {
|
|
public:
|
|
ShardWriterRequest() { clear(); }
|
|
ShardWriterRequest(const ShardWriterRequest& other) {
|
|
_kind = other._kind;
|
|
_data = other._data;
|
|
}
|
|
|
|
ShardWriterRequest(ShardWriterRequest&& other) {
|
|
*this = std::move(other);
|
|
}
|
|
|
|
ShardWriterRequest& operator=(ShardWriterRequest&& other) {
|
|
_kind = other._kind;
|
|
_data = std::move(other._data);
|
|
other.clear();
|
|
return *this;
|
|
}
|
|
|
|
void clear() { _kind = (WriterQueueEntryKind)0; }
|
|
|
|
WriterQueueEntryKind kind() const { return _kind; }
|
|
|
|
LogsDBRequest& setLogsDBRequest() {
|
|
_kind = WriterQueueEntryKind::LOGSDB_REQUEST;
|
|
auto& x = _data.emplace<0>();
|
|
return x;
|
|
}
|
|
|
|
const LogsDBRequest& getLogsDBRequest() const {
|
|
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST);
|
|
return std::get<0>(_data);
|
|
}
|
|
|
|
LogsDBRequest&& moveLogsDBRequest() {
|
|
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_REQUEST, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_REQUEST);
|
|
clear();
|
|
return std::move(std::get<0>(_data));
|
|
}
|
|
|
|
LogsDBResponse& setLogsDBResponse() {
|
|
_kind = WriterQueueEntryKind::LOGSDB_RESPONSE;
|
|
auto& x = _data.emplace<1>();
|
|
return x;
|
|
}
|
|
|
|
const LogsDBResponse& getLogsDBResponse() const {
|
|
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE);
|
|
return std::get<1>(_data);
|
|
}
|
|
|
|
LogsDBResponse&& moveLogsDBResponse() {
|
|
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::LOGSDB_RESPONSE, "%s != %s", _kind, WriterQueueEntryKind::LOGSDB_RESPONSE);
|
|
clear();
|
|
return std::move(std::get<1>(_data));
|
|
}
|
|
|
|
QueuedShardLogEntry& setQueuedShardLogEntry() {
|
|
_kind = WriterQueueEntryKind::SHARD_LOG_ENTRY;
|
|
auto& x = _data.emplace<2>();
|
|
return x;
|
|
}
|
|
|
|
const QueuedShardLogEntry& getQueuedShardLogEntry() const {
|
|
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY);
|
|
return std::get<2>(_data);
|
|
}
|
|
|
|
QueuedShardLogEntry&& moveQueuedShardLogEntry() {
|
|
ALWAYS_ASSERT(_kind == WriterQueueEntryKind::SHARD_LOG_ENTRY, "%s != %s", _kind, WriterQueueEntryKind::SHARD_LOG_ENTRY);
|
|
clear();
|
|
return std::move(std::get<2>(_data));
|
|
}
|
|
|
|
private:
|
|
WriterQueueEntryKind _kind;
|
|
std::variant<LogsDBRequest, LogsDBResponse, QueuedShardLogEntry> _data;
|
|
};
|
|
|
|
struct ReplicaAddrsInfo {
|
|
std::array<sockaddr_in, 2> addrs;
|
|
};
|
|
|
|
bool operator==(sockaddr_in& l, sockaddr_in& r) {
|
|
return l.sin_addr.s_addr == r.sin_addr.s_addr && l.sin_port == r.sin_port;
|
|
}
|
|
|
|
struct ShardShared {
|
|
SharedRocksDB& sharedDB;
|
|
BlockServicesCacheDB& blockServicesCache;
|
|
ShardDB& shardDB;
|
|
std::array<std::atomic<uint32_t>, 2> ips;
|
|
std::array<std::atomic<uint32_t>, 2> ports;
|
|
std::array<struct pollfd, 2> socks;
|
|
std::array<Timings, maxShardMessageKind+1> timings;
|
|
std::array<ErrorCount, maxShardMessageKind+1> errors;
|
|
SPSC<ShardWriterRequest> writerRequestsQueue;
|
|
std::atomic<double> logEntriesQueueSize;
|
|
std::array<std::atomic<double>, 2> receivedRequests; // how many requests we got at once from each socket
|
|
std::atomic<double> pulledWriteRequests; // how many requests we got from write queue
|
|
std::shared_ptr<std::array<ReplicaAddrsInfo, LogsDB::REPLICA_COUNT>> replicas;
|
|
std::atomic<bool> isLeader;
|
|
|
|
ShardShared() = delete;
|
|
ShardShared(SharedRocksDB& sharedDB_, BlockServicesCacheDB& blockServicesCache_, ShardDB& shardDB_): sharedDB(sharedDB_), blockServicesCache(blockServicesCache_), shardDB(shardDB_), ips{0, 0}, ports{0, 0}, writerRequestsQueue(LOG_ENTRIES_QUEUE_SIZE), logEntriesQueueSize(0), pulledWriteRequests(0) {
|
|
for (ShardMessageKind kind : allShardMessageKind) {
|
|
timings[(int)kind] = Timings::Standard();
|
|
}
|
|
for (auto& x: receivedRequests) {
|
|
x = 0;
|
|
}
|
|
}
|
|
};
|
|
|
|
static bool bigRequest(ShardMessageKind kind) {
|
|
return unlikely(
|
|
kind == ShardMessageKind::ADD_SPAN_INITIATE ||
|
|
kind == ShardMessageKind::ADD_SPAN_CERTIFY
|
|
);
|
|
}
|
|
|
|
static bool bigResponse(ShardMessageKind kind) {
|
|
return unlikely(
|
|
kind == ShardMessageKind::READ_DIR ||
|
|
kind == ShardMessageKind::ADD_SPAN_INITIATE ||
|
|
kind == ShardMessageKind::FILE_SPANS ||
|
|
kind == ShardMessageKind::VISIT_DIRECTORIES ||
|
|
kind == ShardMessageKind::VISIT_FILES ||
|
|
kind == ShardMessageKind::VISIT_TRANSIENT_FILES ||
|
|
kind == ShardMessageKind::BLOCK_SERVICE_FILES ||
|
|
kind == ShardMessageKind::FULL_READ_DIR
|
|
);
|
|
}
|
|
|
|
static void packShardResponse(
|
|
Env& env,
|
|
ShardShared& shared,
|
|
std::vector<char>& sendBuf,
|
|
std::vector<struct mmsghdr>& sendHdrs,
|
|
std::vector<struct iovec>& sendVecs,
|
|
uint64_t requestId,
|
|
ShardMessageKind kind,
|
|
Duration elapsed,
|
|
bool dropArtificially,
|
|
const struct sockaddr_in* clientAddr,
|
|
int sockIx,
|
|
EggsError err,
|
|
const ShardRespContainer& resp
|
|
) {
|
|
// pack into sendBuf
|
|
size_t sendBufBegin = sendBuf.size();
|
|
sendBuf.resize(sendBufBegin + MAX_UDP_MTU);
|
|
BincodeBuf respBbuf(&sendBuf[sendBufBegin], MAX_UDP_MTU);
|
|
if (err == NO_ERROR) {
|
|
LOG_DEBUG(env, "successfully processed request %s with kind %s in %s", requestId, kind, elapsed);
|
|
if (bigResponse(kind)) {
|
|
if (unlikely(env._shouldLog(LogLevel::LOG_TRACE))) {
|
|
LOG_TRACE(env, "resp body: %s", resp);
|
|
} else {
|
|
LOG_DEBUG(env, "resp body: <omitted>");
|
|
}
|
|
} else {
|
|
LOG_DEBUG(env, "resp body: %s", resp);
|
|
}
|
|
ShardResponseHeader(requestId, kind).pack(respBbuf);
|
|
resp.pack(respBbuf);
|
|
} else {
|
|
LOG_DEBUG(env, "request %s failed with error %s in %s", kind, err, elapsed);
|
|
ShardResponseHeader(requestId, ShardMessageKind::ERROR).pack(respBbuf);
|
|
respBbuf.packScalar<uint16_t>((uint16_t)err);
|
|
}
|
|
sendBuf.resize(sendBufBegin + respBbuf.len());
|
|
|
|
shared.timings[(int)kind].add(elapsed);
|
|
shared.errors[(int)kind].add(err);
|
|
|
|
if (unlikely(dropArtificially)) {
|
|
LOG_DEBUG(env, "artificially dropping response %s", requestId);
|
|
sendBuf.resize(sendBufBegin);
|
|
return;
|
|
}
|
|
|
|
LOG_DEBUG(env, "will send response for req id %s kind %s to %s", requestId, kind, *clientAddr);
|
|
|
|
// Prepare sendmmsg stuff. The vectors might be resized by the
|
|
// time we get to sending this, so store references when we must
|
|
// -- we'll fix up the actual values later.
|
|
auto& hdr = sendHdrs.emplace_back();
|
|
hdr.msg_hdr = {
|
|
.msg_name = (sockaddr_in*)clientAddr,
|
|
.msg_namelen = sizeof(*clientAddr),
|
|
.msg_iovlen = 1,
|
|
};
|
|
hdr.msg_len = respBbuf.len();
|
|
auto& vec = sendVecs.emplace_back();
|
|
vec.iov_base = (void*)sendBufBegin;
|
|
vec.iov_len = respBbuf.len();
|
|
}
|
|
|
|
struct ShardServer : Loop {
|
|
private:
|
|
// init data
|
|
ShardShared& _shared;
|
|
ShardReplicaId _shrid;
|
|
std::array<IpPort, 2> _ipPorts;
|
|
uint64_t _packetDropRand;
|
|
uint64_t _incomingPacketDropProbability; // probability * 10,000
|
|
uint64_t _outgoingPacketDropProbability; // probability * 10,000
|
|
|
|
// run data
|
|
AES128Key _expandedCDCKey;
|
|
AES128Key _expandedShardKey;
|
|
// recvmmsg data (one per socket, since we receive from both and send from both
|
|
// using some of the header data)
|
|
std::array<std::vector<char>, 2> _recvBuf;
|
|
std::array<std::vector<struct mmsghdr>, 2> _recvHdrs;
|
|
std::array<std::vector<struct sockaddr_in>, 2> _recvAddrs;
|
|
std::array<std::vector<struct iovec>, 2> _recvVecs;
|
|
// what we parse into
|
|
ShardReqContainer _reqContainer;
|
|
ShardRespContainer _respContainer;
|
|
// log entries buffers
|
|
std::vector<ShardWriterRequest> _logEntries;
|
|
// sendmmsg data
|
|
std::vector<char> _sendBuf;
|
|
std::array<std::vector<struct mmsghdr>, 2> _sendHdrs; // one per socket
|
|
std::array<std::vector<struct iovec>, 2> _sendVecs;
|
|
MultiplexedChannel<3, std::array<uint32_t, 3>{SHARD_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}> _channel;
|
|
|
|
public:
|
|
ShardServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) :
|
|
Loop(logger, xmon, "server"),
|
|
_shared(shared),
|
|
_shrid(shrid),
|
|
_ipPorts(options.ipPorts),
|
|
_packetDropRand(eggsNow().ns),
|
|
_incomingPacketDropProbability(0),
|
|
_outgoingPacketDropProbability(0),
|
|
_channel(_env, MAX_RECV_MSGS * _ipPorts.size())
|
|
{
|
|
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
|
|
if (prob != 0.0) {
|
|
LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what);
|
|
iprob = prob * 10'000.0;
|
|
ALWAYS_ASSERT(iprob > 0 && iprob < 10'000);
|
|
}
|
|
};
|
|
convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability);
|
|
convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability);
|
|
expandKey(ShardKey, _expandedShardKey);
|
|
|
|
_init();
|
|
}
|
|
|
|
virtual ~ShardServer() = default;
|
|
|
|
private:
|
|
void _init() {
|
|
LOG_INFO(_env, "initializing server sockets");
|
|
expandKey(CDCKey, _expandedCDCKey);
|
|
|
|
memset(_shared.socks.data(), 0, _shared.socks.size()*sizeof(_shared.socks[0]));
|
|
for (int i = 0; i < _ipPorts.size(); i++) {
|
|
const auto& ipPort = _ipPorts[i];
|
|
if (ipPort.ip == 0) { break; }
|
|
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
|
if (sock < 0) {
|
|
throw SYSCALL_EXCEPTION("cannot create socket");
|
|
}
|
|
struct sockaddr_in serverAddr;
|
|
serverAddr.sin_family = AF_INET;
|
|
uint32_t ipn = htonl(ipPort.ip);
|
|
memcpy(&serverAddr.sin_addr.s_addr, &ipn, sizeof(ipn));
|
|
serverAddr.sin_port = htons(ipPort.port);
|
|
if (bind(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0) {
|
|
char ip[INET_ADDRSTRLEN];
|
|
throw SYSCALL_EXCEPTION("cannot bind socket to addr %s:%s", inet_ntop(AF_INET, &serverAddr.sin_addr, ip, INET_ADDRSTRLEN), ipPort.port);
|
|
}
|
|
{
|
|
socklen_t addrLen = sizeof(serverAddr);
|
|
if (getsockname(sock, (struct sockaddr*)&serverAddr, &addrLen) < 0) {
|
|
throw SYSCALL_EXCEPTION("getsockname");
|
|
}
|
|
}
|
|
// stats are ~50byte, and are the most common request, say 100byte
|
|
// per request on average, 1MiB buffer should be enough for 10k requests
|
|
// or so in each of the two queues.
|
|
{
|
|
int bufSize = 1<<20;
|
|
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void*)&bufSize, sizeof(bufSize)) < 0) {
|
|
throw SYSCALL_EXCEPTION("setsockopt");
|
|
}
|
|
}
|
|
// store addresses/fds
|
|
_shared.ips[i].store(ntohl(serverAddr.sin_addr.s_addr), std::memory_order_release);
|
|
_shared.ports[i].store(ntohs(serverAddr.sin_port), std::memory_order_release);
|
|
_shared.socks[i].fd = sock;
|
|
_shared.socks[i].events = POLL_IN;
|
|
LOG_INFO(_env, "Bound shard %s to %s", _shrid, serverAddr);
|
|
|
|
_recvBuf[i].resize(DEFAULT_UDP_MTU*MAX_RECV_MSGS);
|
|
_recvHdrs[i].resize(MAX_RECV_MSGS);
|
|
memset(_recvHdrs[i].data(), 0, sizeof(_recvHdrs[i][0])*MAX_RECV_MSGS);
|
|
_recvAddrs[i].resize(MAX_RECV_MSGS);
|
|
_recvVecs[i].resize(MAX_RECV_MSGS);
|
|
for (int j = 0; j < _recvVecs[i].size(); j++) {
|
|
_recvVecs[i][j].iov_base = &_recvBuf[i][j*DEFAULT_UDP_MTU];
|
|
_recvVecs[i][j].iov_len = DEFAULT_UDP_MTU;
|
|
_recvHdrs[i][j].msg_hdr.msg_iov = &_recvVecs[i][j];
|
|
_recvHdrs[i][j].msg_hdr.msg_iovlen = 1;
|
|
_recvHdrs[i][j].msg_hdr.msg_namelen = sizeof(_recvAddrs[i][j]);
|
|
_recvHdrs[i][j].msg_hdr.msg_name = &_recvAddrs[i][j];
|
|
}
|
|
}
|
|
}
|
|
|
|
void _handleLogsDBResponse(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& buf) {
|
|
LOG_DEBUG(_env, "received LogsDBResponse from %s", *clientAddr);
|
|
|
|
auto replicaId = _getReplicaId(clientAddr);
|
|
|
|
if (replicaId == LogsDB::REPLICA_COUNT) {
|
|
LOG_DEBUG(_env, "We can't match this address to replica. Dropping");
|
|
return;
|
|
}
|
|
|
|
auto& resp = _logEntries.emplace_back().setLogsDBResponse();
|
|
resp.replicaId = replicaId;
|
|
|
|
try {
|
|
resp.header.unpack(buf);
|
|
} catch (const BincodeException& err) {
|
|
LOG_ERROR(_env, "Could not parse LogsDBResponse.header: %s", err.what());
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) {
|
|
LOG_DEBUG(_env, "artificially dropping LogsDBResponse %s", resp.header.requestId);
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
resp.responseContainer.unpack(buf, resp.header.kind);
|
|
} catch (const BincodeException& exc) {
|
|
LOG_ERROR(_env, "Could not parse LogsDBResponse.responseContainer of kind %s: %s", resp.header.kind, exc.what());
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
if (unlikely(buf.remaining() < 8)) {
|
|
LOG_ERROR(_env, "Could not parse LogsDBResponse of kind %s from %s. Message signature is 8 bytes, only %s remaining", resp.header.kind, *clientAddr, buf.remaining());
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
auto expectedMac = cbcmac(_expandedShardKey, buf.data, buf.cursor - buf.data);
|
|
BincodeFixedBytes<8> receivedMac;
|
|
buf.unpackFixedBytes<8>(receivedMac);
|
|
if (unlikely(expectedMac != receivedMac.data)) {
|
|
LOG_ERROR(_env, "Incorrect signature for LogsDBResponse %s from %s", resp.header.kind, *clientAddr);
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
if (unlikely(buf.remaining())) {
|
|
LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBResponse %s from %s", buf.remaining(), resp.header.kind, *clientAddr);
|
|
_logEntries.pop_back();
|
|
}
|
|
LOG_DEBUG(_env, "Received response %s for requests id %s from replica id %s", resp.header.kind, resp.header.requestId, resp.replicaId);
|
|
}
|
|
|
|
void _handleLogsDBRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& buf) {
|
|
LOG_DEBUG(_env, "received LogsDBRequest from %s", *clientAddr);
|
|
|
|
auto replicaId = _getReplicaId(clientAddr);
|
|
|
|
if (replicaId == LogsDB::REPLICA_COUNT) {
|
|
LOG_DEBUG(_env, "We can't match this address to replica. Dropping");
|
|
return;
|
|
}
|
|
|
|
auto& req = _logEntries.emplace_back().setLogsDBRequest();
|
|
req.replicaId = replicaId;
|
|
|
|
try {
|
|
req.header.unpack(buf);
|
|
} catch (const BincodeException& err) {
|
|
LOG_ERROR(_env, "Could not parse LogsDBRequest.header: %s", err.what());
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) {
|
|
LOG_DEBUG(_env, "artificially dropping LogsDBRequest %s", req.header.requestId);
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
req.requestContainer.unpack(buf, req.header.kind);
|
|
} catch (const BincodeException& exc) {
|
|
LOG_ERROR(_env, "Could not parse LogsDBRequest.requestContainer of kind %s: %s", req.header.kind, exc.what());
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
if (unlikely(buf.remaining() < 8)) {
|
|
LOG_ERROR(_env, "Could not parse LogsDBRequest of kind %s from %s, message signature is 8 bytes, only %s remaining", req.header.kind, *clientAddr, buf.remaining());
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
auto expectedMac = cbcmac(_expandedShardKey, buf.data, buf.cursor - buf.data);
|
|
BincodeFixedBytes<8> receivedMac;
|
|
buf.unpackFixedBytes<8>(receivedMac);
|
|
if (unlikely(expectedMac != receivedMac.data)) {
|
|
LOG_ERROR(_env, "Incorrect signature for LogsDBRequest %s from %s", req.header.kind, *clientAddr);
|
|
_logEntries.pop_back();
|
|
return;
|
|
}
|
|
|
|
if (unlikely(buf.remaining())) {
|
|
LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBRequest %s from %s", buf.remaining(), req.header.kind, *clientAddr);
|
|
_logEntries.pop_back();
|
|
}
|
|
LOG_DEBUG(_env, "Received request %s with requests id %s from replica id %s", req.header.kind, req.header.requestId, req.replicaId);
|
|
}
|
|
|
|
uint8_t _getReplicaId(sockaddr_in* clientAddress) {
|
|
auto replicasPtr = _shared.replicas;
|
|
if (!replicasPtr) {
|
|
return LogsDB::REPLICA_COUNT;
|
|
}
|
|
|
|
for (ReplicaId replicaId = 0; replicaId.u8 < replicasPtr->size(); ++replicaId.u8) {
|
|
auto& replica = replicasPtr->at(replicaId.u8);
|
|
if (replica.addrs[0] == *clientAddress || replica.addrs[1] == *clientAddress) {
|
|
return replicaId.u8;
|
|
}
|
|
}
|
|
return LogsDB::REPLICA_COUNT;
|
|
}
|
|
|
|
void _handleShardRequest(int sockIx, struct sockaddr_in* clientAddr, BincodeBuf& reqBbuf) {
|
|
LOG_DEBUG(_env, "received message from %s", *clientAddr);
|
|
|
|
// First, try to parse the header
|
|
ShardRequestHeader reqHeader;
|
|
try {
|
|
reqHeader.unpack(reqBbuf);
|
|
} catch (const BincodeException& err) {
|
|
LOG_ERROR(_env, "Could not parse: %s", err.what());
|
|
RAISE_ALERT(_env, "could not parse request header from %s, dropping it.", *clientAddr);
|
|
return;
|
|
}
|
|
|
|
if (wyhash64(&_packetDropRand) % 10'000 < _incomingPacketDropProbability) {
|
|
LOG_DEBUG(_env, "artificially dropping request %s", reqHeader.requestId);
|
|
return;
|
|
}
|
|
|
|
auto t0 = eggsNow();
|
|
|
|
LOG_DEBUG(_env, "received request id %s, kind %s, from %s", reqHeader.requestId, reqHeader.kind, *clientAddr);
|
|
|
|
// If this will be filled in with an actual code, it means that we couldn't process
|
|
// the request.
|
|
EggsError err = NO_ERROR;
|
|
|
|
// Now, try to parse the body
|
|
try {
|
|
_reqContainer.unpack(reqBbuf, reqHeader.kind);
|
|
if (bigRequest(reqHeader.kind)) {
|
|
if (unlikely(_env._shouldLog(LogLevel::LOG_TRACE))) {
|
|
LOG_TRACE(_env, "parsed request: %s", _reqContainer);
|
|
} else {
|
|
LOG_DEBUG(_env, "parsed request: <omitted>");
|
|
}
|
|
} else {
|
|
LOG_DEBUG(_env, "parsed request: %s", _reqContainer);
|
|
}
|
|
} catch (const BincodeException& exc) {
|
|
LOG_ERROR(_env, "Could not parse: %s", exc.what());
|
|
RAISE_ALERT(_env, "could not parse request of kind %s from %s, will reply with error.", reqHeader.kind, *clientAddr);
|
|
err = EggsError::MALFORMED_REQUEST;
|
|
}
|
|
|
|
if (likely(err == NO_ERROR)) {
|
|
// authenticate, if necessary
|
|
if (isPrivilegedRequestKind(reqHeader.kind)) {
|
|
if (unlikely(reqBbuf.remaining() < 8)) {
|
|
LOG_ERROR(_env, "Could not parse request of kind %s from %s, message signature is 8 bytes, only %s remaining", reqHeader.kind, *clientAddr, reqBbuf.remaining());
|
|
RAISE_ALERT(_env, "could not parse request of kind %s from %s, will reply with error.", reqHeader.kind, *clientAddr);
|
|
err = EggsError::MALFORMED_REQUEST;
|
|
} else {
|
|
auto expectedMac = cbcmac(_expandedCDCKey, reqBbuf.data, reqBbuf.cursor - reqBbuf.data);
|
|
BincodeFixedBytes<8> receivedMac;
|
|
reqBbuf.unpackFixedBytes<8>(receivedMac);
|
|
if (expectedMac != receivedMac.data) {
|
|
err = EggsError::NOT_AUTHORISED;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Make sure nothing is left
|
|
if (unlikely(err == NO_ERROR && reqBbuf.remaining() != 0)) {
|
|
RAISE_ALERT(_env, "%s bytes remaining after parsing request of kind %s from %s, will reply with error", reqBbuf.remaining(), reqHeader.kind, *clientAddr);
|
|
err = EggsError::MALFORMED_REQUEST;
|
|
}
|
|
|
|
// At this point, if it's a read request, we can process it,
|
|
// if it's a write request we prepare the log entry and
|
|
// send it off.
|
|
if (likely(err == NO_ERROR)) {
|
|
if (readOnlyShardReq(_reqContainer.kind())) {
|
|
err = _shared.shardDB.read(_reqContainer, _respContainer);
|
|
} else {
|
|
auto& entry = _logEntries.emplace_back().setQueuedShardLogEntry();
|
|
entry.sockIx = sockIx;
|
|
entry.clientAddr = *clientAddr;
|
|
entry.receivedAt = t0;
|
|
entry.requestKind = reqHeader.kind;
|
|
entry.requestId = reqHeader.requestId;
|
|
err = _shared.shardDB.prepareLogEntry(_reqContainer, entry.logEntry);
|
|
if (likely(err == NO_ERROR)) {
|
|
return; // we're done here, move along
|
|
} else {
|
|
_logEntries.pop_back(); // back out the log entry
|
|
}
|
|
}
|
|
}
|
|
|
|
Duration elapsed = eggsNow() - t0;
|
|
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
|
|
packShardResponse(_env, _shared, _sendBuf, _sendHdrs[sockIx], _sendVecs[sockIx], reqHeader.requestId, reqHeader.kind, elapsed, dropArtificially, clientAddr, sockIx, err, _respContainer);
|
|
}
|
|
|
|
public:
|
|
virtual void step() override {
|
|
if (unlikely(!_shared.blockServicesCache.haveBlockServices())) {
|
|
(100_ms).sleepRetry();
|
|
return;
|
|
}
|
|
|
|
_logEntries.clear();
|
|
_sendBuf.clear();
|
|
_channel.clear();
|
|
for (int i = 0; i < 2; i++) {
|
|
_sendHdrs[i].clear();
|
|
_sendVecs[i].clear();
|
|
}
|
|
|
|
if (unlikely(Loop::poll(_shared.socks.data(), 1 + (_shared.socks[1].fd != 0), -1) < 0)) {
|
|
if (errno == EINTR) { return; }
|
|
throw SYSCALL_EXCEPTION("poll");
|
|
}
|
|
|
|
for (int sockIx = 0; sockIx < _shared.socks.size(); sockIx++) {
|
|
const auto& sock = _shared.socks[sockIx];
|
|
if (!(sock.revents & POLLIN)) { continue; }
|
|
|
|
int msgs = recvmmsg(_shared.socks[sockIx].fd, &_recvHdrs[sockIx][0], _recvHdrs[sockIx].size(), MSG_DONTWAIT, nullptr);
|
|
if (unlikely(msgs < 0)) { // we know we have data from poll, we won't get EAGAIN
|
|
throw SYSCALL_EXCEPTION("recvmmsgs");
|
|
}
|
|
LOG_DEBUG(_env, "received %s messages from socket %s", msgs, sockIx);
|
|
|
|
if (msgs > 0) {
|
|
_shared.receivedRequests[sockIx] = _shared.receivedRequests[sockIx]*0.95 + ((double)msgs)*0.05;
|
|
}
|
|
for (int msgIx = 0; msgIx < msgs; msgIx++) {
|
|
_channel.demultiplexMessage(sockIx, _recvHdrs[sockIx][msgIx]);
|
|
}
|
|
}
|
|
|
|
for (auto& msg : _channel.getProtocolMessages(LOG_RESP_PROTOCOL_VERSION)) {
|
|
_handleLogsDBResponse(msg.socketId, msg.clientAddr, msg.buf);
|
|
}
|
|
|
|
for (auto& msg : _channel.getProtocolMessages(LOG_REQ_PROTOCOL_VERSION)) {
|
|
_handleLogsDBRequest(msg.socketId, msg.clientAddr, msg.buf);
|
|
}
|
|
|
|
for (auto& msg : _channel.getProtocolMessages(SHARD_REQ_PROTOCOL_VERSION)) {
|
|
_handleShardRequest(msg.socketId, msg.clientAddr, msg.buf);
|
|
}
|
|
|
|
|
|
// write out write requests to queue
|
|
{
|
|
size_t numLogEntries = _logEntries.size();
|
|
if (numLogEntries > 0) {
|
|
LOG_DEBUG(_env, "pushing %s log entries to writer", numLogEntries);
|
|
uint32_t pushed = _shared.writerRequestsQueue.push(_logEntries);
|
|
_shared.logEntriesQueueSize = _shared.logEntriesQueueSize*0.95 + _shared.writerRequestsQueue.size()*0.05;
|
|
if (pushed < numLogEntries) {
|
|
LOG_INFO(_env, "tried to push %s elements to write queue, but pushed %s instead", numLogEntries, pushed);
|
|
}
|
|
}
|
|
}
|
|
// write out read responses to UDP
|
|
for (int i = 0; i < 2; i++) {
|
|
if (_sendHdrs[i].size() == 0) { continue; }
|
|
LOG_DEBUG(_env, "sending %s read responses to sock %s", _sendHdrs[i].size(), i);
|
|
for (int j = 0; j < _sendHdrs[i].size(); j++) {
|
|
auto& vec = _sendVecs[i][j];
|
|
vec.iov_base = &_sendBuf[(size_t)vec.iov_base];
|
|
auto& hdr = _sendHdrs[i][j];
|
|
hdr.msg_hdr.msg_iov = &vec;
|
|
}
|
|
int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0);
|
|
if (unlikely(ret < 0)) {
|
|
// we get EPERM when nf drops packets
|
|
if (errno == EPERM) {
|
|
LOG_INFO(_env, "we got EPERM when trying to send %s messages, will drop them", _sendHdrs[i].size());
|
|
} else {
|
|
throw SYSCALL_EXCEPTION("sendto");
|
|
}
|
|
} else if (unlikely(ret < _sendHdrs[i].size())) {
|
|
LOG_INFO(_env, "dropping %s out of %s requests since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size());
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
struct ShardWriter : Loop {
|
|
private:
|
|
ShardShared& _shared;
|
|
AES128Key _expandedShardKey;
|
|
uint64_t _currentLogIndex;
|
|
ShardRespContainer _respContainer;
|
|
std::vector<ShardWriterRequest> _requests;
|
|
const size_t _maxWritesAtOnce;
|
|
|
|
std::vector<QueuedShardLogEntry> _logEntries;
|
|
|
|
std::unique_ptr<LogsDB> _logsDB;
|
|
const bool _dontDoReplication;
|
|
std::vector<LogsDBRequest> _logsDBRequests;
|
|
std::vector<LogsDBResponse> _logsDBResponses;
|
|
std::vector<LogsDBRequest *> _logsDBOutRequests;
|
|
std::vector<LogsDBResponse> _logsDBOutResponses;
|
|
std::unordered_map<uint64_t, QueuedShardLogEntry> _inFlightEntries;
|
|
std::vector<QueuedShardLogEntry> _outgoingLogEntries;
|
|
std::shared_ptr<std::array<ReplicaAddrsInfo, LogsDB::REPLICA_COUNT>> _replicaInfo;
|
|
|
|
// sendmmsg data (one per socket)
|
|
std::vector<char> _sendBuf;
|
|
std::array<std::vector<struct mmsghdr>, 2> _sendHdrs; // one per socket
|
|
std::array<std::vector<struct iovec>, 2> _sendVecs;
|
|
|
|
uint64_t _packetDropRand;
|
|
uint64_t _incomingPacketDropProbability; // probability * 10,000
|
|
uint64_t _outgoingPacketDropProbability; // probability * 10,000
|
|
|
|
virtual void sendStop() override {
|
|
_shared.writerRequestsQueue.close();
|
|
}
|
|
|
|
public:
|
|
ShardWriter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) :
|
|
Loop(logger, xmon, "writer"),
|
|
_shared(shared),
|
|
_maxWritesAtOnce(options.writeToLogsDB ? LogsDB::IN_FLIGHT_APPEND_WINDOW * 10 : MAX_WRITES_AT_ONCE),
|
|
_dontDoReplication(options.dontDoReplication),
|
|
_packetDropRand(eggsNow().ns),
|
|
_incomingPacketDropProbability(0),
|
|
_outgoingPacketDropProbability(0)
|
|
{
|
|
expandKey(ShardKey, _expandedShardKey);
|
|
_currentLogIndex = _shared.shardDB.lastAppliedLogEntry();
|
|
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
|
|
if (prob != 0.0) {
|
|
LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what);
|
|
iprob = prob * 10'000.0;
|
|
ALWAYS_ASSERT(iprob > 0 && iprob < 10'000);
|
|
}
|
|
};
|
|
convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability);
|
|
convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability);
|
|
_requests.reserve(_maxWritesAtOnce);
|
|
|
|
if (options.writeToLogsDB) {
|
|
_logsDB.reset(new LogsDB(_env,_shared.sharedDB,shrid.replicaId(), _currentLogIndex, options.dontDoReplication, options.forceLeader, options.avoidBeingLeader, options.forcedLastReleased));
|
|
_logsDB->processIncomingMessages(_logsDBRequests, _logsDBResponses);
|
|
_shared.isLeader.store(_logsDB->isLeader(), std::memory_order_relaxed);
|
|
} else {
|
|
LogsDB::clearAllData(shared.sharedDB);
|
|
_shared.isLeader.store(shrid.replicaId() == 0);
|
|
}
|
|
}
|
|
|
|
virtual ~ShardWriter() = default;
|
|
|
|
void logsDBStep() {
|
|
LOG_DEBUG(_env, "Calling LogsDB processIncomingMessages with %s requests and %s responses", _logsDBRequests.size(), _logsDBResponses.size());
|
|
_logsDB->processIncomingMessages(_logsDBRequests,_logsDBResponses);
|
|
_shared.isLeader.store(_logsDB->isLeader(), std::memory_order_relaxed);
|
|
|
|
bool droppedDueToInFlightWindow = false;
|
|
|
|
// If we are leader write any outstanding entries
|
|
std::vector<LogsDBLogEntry> logsDBEntries;
|
|
if (!_logsDB->isLeader()) {
|
|
//TODO: we could notify clients we are no longer leader
|
|
if (_logEntries.size() > 0) {
|
|
LOG_INFO(_env, "Received %s shard write requests but we are not log leader. dropping", _logEntries.size());
|
|
_logEntries.clear();
|
|
}
|
|
if (_inFlightEntries.size() > 0) {
|
|
LOG_INFO(_env, "There are %s in flight shard write requests but we are no longer log leader. dropping", _inFlightEntries.size());
|
|
_inFlightEntries.clear();
|
|
}
|
|
} else if (_logEntries.size()) {
|
|
logsDBEntries.reserve(_logEntries.size());
|
|
std::vector<uint8_t> data;
|
|
data.resize(MAX_UDP_MTU);
|
|
logsDBEntries.reserve(_requests.size());
|
|
LogIdx expectedIdx = _currentLogIndex + _inFlightEntries.size();
|
|
for (auto& queuedLogEntry : _logEntries) {
|
|
auto& logsDBEntry = logsDBEntries.emplace_back();;
|
|
queuedLogEntry.logEntry.idx = ++expectedIdx;
|
|
BincodeBuf buf((char*)&data[0], MAX_UDP_MTU);
|
|
queuedLogEntry.logEntry.pack(buf);
|
|
logsDBEntry.value.assign(buf.data, buf.cursor);
|
|
}
|
|
|
|
auto err = _logsDB->appendEntries(logsDBEntries);
|
|
ALWAYS_ASSERT(err == NO_ERROR);
|
|
ALWAYS_ASSERT(_logEntries.size() == logsDBEntries.size());
|
|
for (size_t i = 0; i < _logEntries.size(); ++i) {
|
|
if (logsDBEntries[i].idx == 0) {
|
|
// if we don't wait for replication the window gets cleared immediately
|
|
ALWAYS_ASSERT(!_dontDoReplication);
|
|
droppedDueToInFlightWindow = true;
|
|
LOG_INFO(_env, "Appended %s out of %s shard write requests. Log in flight windows is full. Dropping other entries", i, _logEntries.size());
|
|
break;
|
|
}
|
|
ALWAYS_ASSERT(logsDBEntries[i].idx == _logEntries[i].logEntry.idx);
|
|
_inFlightEntries.emplace(_logEntries[i].logEntry.idx.u64, std::move(_logEntries[i]));
|
|
}
|
|
logsDBEntries.clear();
|
|
|
|
if (_dontDoReplication) {
|
|
// usually the state machine is moved by responses if we don't expect any we move it manually
|
|
_logsDBRequests.clear();
|
|
_logsDBResponses.clear();
|
|
_logsDB->processIncomingMessages(_logsDBRequests, _logsDBResponses);
|
|
}
|
|
}
|
|
|
|
// Log if not active is not chaty but it's messages are higher priority as they make us progress state under high load.
|
|
// We want to have priority when sending out
|
|
_logsDB->getOutgoingMessages(_logsDBOutRequests, _logsDBOutResponses);
|
|
|
|
for (auto& response : _logsDBOutResponses) {
|
|
packLogsDBResponse(response);
|
|
}
|
|
|
|
for (auto request : _logsDBOutRequests) {
|
|
packLogsDBRequest(*request);
|
|
}
|
|
|
|
_logsDB->readEntries(logsDBEntries);
|
|
_outgoingLogEntries.reserve(logsDBEntries.size());
|
|
if (_dontDoReplication && _logsDB->isLeader()) {
|
|
ALWAYS_ASSERT(_inFlightEntries.size() == logsDBEntries.size());
|
|
}
|
|
|
|
for (auto& logsDBEntry : logsDBEntries) {
|
|
++_currentLogIndex;
|
|
ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx);
|
|
ALWAYS_ASSERT(logsDBEntry.value.size() > 0);
|
|
BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size());
|
|
ShardLogEntry shardEntry;
|
|
shardEntry.unpack(buf);
|
|
ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx);
|
|
auto it = _inFlightEntries.find(shardEntry.idx.u64);
|
|
|
|
if (_dontDoReplication && _logsDB->isLeader()) {
|
|
ALWAYS_ASSERT(it != _inFlightEntries.end());
|
|
ALWAYS_ASSERT(shardEntry == it->second.logEntry);
|
|
}
|
|
|
|
auto err = _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _respContainer);
|
|
if (it != _inFlightEntries.end()) {
|
|
auto& logEntry = _outgoingLogEntries.emplace_back(std::move(it->second));
|
|
_inFlightEntries.erase(it);
|
|
|
|
ALWAYS_ASSERT(shardEntry == logEntry.logEntry);
|
|
if (likely(logEntry.requestId)) {
|
|
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr);
|
|
} else {
|
|
LOG_DEBUG(_env, "applying request-less log entry");
|
|
}
|
|
if (likely(logEntry.requestId)) {
|
|
Duration elapsed = eggsNow() - logEntry.receivedAt;
|
|
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
|
|
packShardResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer);
|
|
} else if (unlikely(err != NO_ERROR)) {
|
|
RAISE_ALERT(_env, "could not apply request-less log entry: %s", err);
|
|
}
|
|
}
|
|
}
|
|
if (unlikely(droppedDueToInFlightWindow)) {
|
|
// We can't check before as we first need to process and remove entries that have been released
|
|
ALWAYS_ASSERT(_inFlightEntries.size() == LogsDB::IN_FLIGHT_APPEND_WINDOW);
|
|
}
|
|
_shared.shardDB.flush(true);
|
|
// not needed as we just flushed and apparently it does actually flush again
|
|
// _logsDB->flush(true);
|
|
|
|
sendMessages();
|
|
}
|
|
|
|
void noLogsDbStep() {
|
|
for (auto& logEntry : _logEntries) {
|
|
if (likely(logEntry.requestId)) {
|
|
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", logEntry.requestId, logEntry.requestKind, logEntry.clientAddr);
|
|
} else {
|
|
LOG_DEBUG(_env, "applying request-less log entry");
|
|
}
|
|
|
|
auto err = _shared.shardDB.applyLogEntry(++_currentLogIndex, logEntry.logEntry, _respContainer);
|
|
if (likely(logEntry.requestId)) {
|
|
Duration elapsed = eggsNow() - logEntry.receivedAt;
|
|
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
|
|
packShardResponse(_env, _shared, _sendBuf, _sendHdrs[logEntry.sockIx], _sendVecs[logEntry.sockIx], logEntry.requestId, logEntry.requestKind, elapsed, dropArtificially, &logEntry.clientAddr, logEntry.sockIx, err, _respContainer);
|
|
} else if (unlikely(err != NO_ERROR)) {
|
|
RAISE_ALERT(_env, "could not apply request-less log entry: %s", err);
|
|
}
|
|
}
|
|
if (_requests.size() > 0) {
|
|
LOG_DEBUG(_env, "flushing and sending %s writes", _requests.size());
|
|
_shared.shardDB.flush(true);
|
|
// important to send all of them after the flush! otherwise it's not durable yet
|
|
sendMessages();
|
|
}
|
|
}
|
|
|
|
ReplicaAddrsInfo* addressFromReplicaId(ReplicaId id) {
|
|
if (!_replicaInfo) {
|
|
return nullptr;
|
|
}
|
|
|
|
auto& replicaInfo = (*_replicaInfo)[id.u8];
|
|
if (replicaInfo.addrs[0].sin_addr.s_addr == 0) {
|
|
return nullptr;
|
|
}
|
|
return &replicaInfo;
|
|
}
|
|
|
|
void packLogsDBResponse(LogsDBResponse& response) {
|
|
auto addrInfoPtr = addressFromReplicaId(response.replicaId);
|
|
if (unlikely(addrInfoPtr == nullptr)) {
|
|
LOG_DEBUG(_env, "No information for replica id %s. dropping response", response.replicaId);
|
|
return;
|
|
}
|
|
auto& addrInfo = *addrInfoPtr;
|
|
|
|
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
|
|
if (unlikely(dropArtificially)) {
|
|
LOG_DEBUG(_env, "artificially dropping response %s", response.header.requestId);
|
|
return;
|
|
}
|
|
// pack into sendBuf
|
|
size_t sendBufBegin = _sendBuf.size();
|
|
_sendBuf.resize(sendBufBegin + MAX_UDP_MTU);
|
|
BincodeBuf respBbuf(&_sendBuf[sendBufBegin], MAX_UDP_MTU);
|
|
response.header.pack(respBbuf);
|
|
response.responseContainer.pack(respBbuf);
|
|
respBbuf.packFixedBytes<8>({cbcmac(_expandedShardKey, respBbuf.data, respBbuf.cursor - respBbuf.data)});
|
|
_sendBuf.resize(sendBufBegin + respBbuf.len());
|
|
|
|
auto now = eggsNow(); // randomly pick one of the shard addrs and one of our sockets
|
|
int whichReplicaAddr = now.ns & !!addrInfo.addrs[1].sin_port;
|
|
int whichSock = (now.ns>>1) & !!_shared.ips[1];
|
|
|
|
LOG_DEBUG(_env, "will send response for req id %s kind %s to %s", response.header.requestId, response.header.kind, addrInfo.addrs[whichReplicaAddr]);
|
|
|
|
// Prepare sendmmsg stuff. The vectors might be resized by the
|
|
// time we get to sending this, so store references when we must
|
|
// -- we'll fix up the actual values later.
|
|
auto& hdr = _sendHdrs[whichSock].emplace_back();
|
|
hdr.msg_hdr = {
|
|
.msg_name = (sockaddr_in*)&addrInfo.addrs[whichReplicaAddr],
|
|
.msg_namelen = sizeof(addrInfo.addrs[whichReplicaAddr]),
|
|
.msg_iovlen = 1,
|
|
};
|
|
hdr.msg_len = respBbuf.len();
|
|
auto& vec = _sendVecs[whichSock].emplace_back();
|
|
vec.iov_base = (void*)sendBufBegin;
|
|
vec.iov_len = respBbuf.len();
|
|
}
|
|
|
|
void packLogsDBRequest(LogsDBRequest& request) {
|
|
auto addrInfoPtr = addressFromReplicaId(request.replicaId);
|
|
if (unlikely(addrInfoPtr == nullptr)) {
|
|
LOG_DEBUG(_env, "No information for replica id %s. dropping request", request.replicaId);
|
|
return;
|
|
}
|
|
auto& addrInfo = *addrInfoPtr;
|
|
|
|
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
|
|
if (unlikely(dropArtificially)) {
|
|
LOG_DEBUG(_env, "artificially dropping request %s", request.header.requestId);
|
|
return;
|
|
}
|
|
// pack into sendBuf
|
|
size_t sendBufBegin = _sendBuf.size();
|
|
_sendBuf.resize(sendBufBegin + MAX_UDP_MTU);
|
|
BincodeBuf respBbuf(&_sendBuf[sendBufBegin], MAX_UDP_MTU);
|
|
request.header.pack(respBbuf);
|
|
request.requestContainer.pack(respBbuf);
|
|
respBbuf.packFixedBytes<8>({cbcmac(_expandedShardKey, respBbuf.data, respBbuf.cursor - respBbuf.data)});
|
|
_sendBuf.resize(sendBufBegin + respBbuf.len());
|
|
|
|
request.sentTime = eggsNow(); // randomly pick one of the shard addrs and one of our sockets
|
|
int whichReplicaAddr = request.sentTime.ns & !!addrInfo.addrs[1].sin_port;
|
|
int whichSock = (request.sentTime.ns>>1) & !!_shared.ips[1];
|
|
|
|
LOG_DEBUG(_env, "will send request for req id %s kind %s to %s", request.header.requestId, request.header.kind, addrInfo.addrs[whichReplicaAddr]);
|
|
|
|
// Prepare sendmmsg stuff. The vectors might be resized by the
|
|
// time we get to sending this, so store references when we must
|
|
// -- we'll fix up the actual values later.
|
|
auto& hdr = _sendHdrs[whichSock].emplace_back();
|
|
hdr.msg_hdr = {
|
|
.msg_name = (sockaddr_in*)&addrInfo.addrs[whichReplicaAddr],
|
|
.msg_namelen = sizeof(addrInfo.addrs[whichReplicaAddr]),
|
|
.msg_iovlen = 1,
|
|
};
|
|
hdr.msg_len = respBbuf.len();
|
|
auto& vec = _sendVecs[whichSock].emplace_back();
|
|
vec.iov_base = (void*)sendBufBegin;
|
|
vec.iov_len = respBbuf.len();
|
|
}
|
|
|
|
void sendMessages() {
|
|
for (int i = 0; i < _sendHdrs.size(); i++) {
|
|
if (_sendHdrs[i].size() == 0) { continue; }
|
|
LOG_DEBUG(_env, "sending %s messages to socket %s", _sendHdrs[i].size(), i);
|
|
for (int j = 0; j < _sendHdrs[i].size(); j++) {
|
|
auto& vec = _sendVecs[i][j];
|
|
vec.iov_base = &_sendBuf[(size_t)vec.iov_base];
|
|
auto& hdr = _sendHdrs[i][j];
|
|
hdr.msg_hdr.msg_iov = &vec;
|
|
}
|
|
int ret = sendmmsg(_shared.socks[i].fd, &_sendHdrs[i][0], _sendHdrs[i].size(), 0);
|
|
if (unlikely(ret < 0)) {
|
|
// we get this when nf drops packets
|
|
if (errno != EPERM) {
|
|
throw SYSCALL_EXCEPTION("sendto");
|
|
} else {
|
|
LOG_INFO(_env, "dropping %s messages because of EPERM", _sendHdrs[i].size());
|
|
}
|
|
} else if (unlikely(ret < _sendHdrs[i].size())) {
|
|
LOG_INFO(_env, "dropping %s out of %s messages since `sendmmsg` could not send them all", _sendHdrs[i].size()-ret, _sendHdrs[i].size());
|
|
}
|
|
}
|
|
}
|
|
|
|
virtual void step() override {
|
|
_requests.clear();
|
|
_logsDBRequests.clear();
|
|
_logsDBResponses.clear();
|
|
_logsDBOutRequests.clear();
|
|
_logsDBOutResponses.clear();
|
|
_logEntries.clear();
|
|
_outgoingLogEntries.clear();
|
|
_sendBuf.clear();
|
|
for (int i = 0; i < _sendHdrs.size(); i++) {
|
|
_sendHdrs[i].clear();
|
|
_sendVecs[i].clear();
|
|
}
|
|
_replicaInfo = _shared.replicas;
|
|
uint32_t pulled = _shared.writerRequestsQueue.pull(_requests, _maxWritesAtOnce, _logsDB ? _logsDB->getNextTimeout() : -1);
|
|
if (likely(pulled > 0)) {
|
|
LOG_DEBUG(_env, "pulled %s requests from write queue", pulled);
|
|
_shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05;
|
|
}
|
|
if (unlikely(_shared.writerRequestsQueue.isClosed())) {
|
|
// queue is closed, stop
|
|
stop();
|
|
return;
|
|
}
|
|
|
|
for(auto& request : _requests) {
|
|
switch (request.kind()) {
|
|
case WriterQueueEntryKind::LOGSDB_REQUEST:
|
|
_logsDBRequests.emplace_back(request.moveLogsDBRequest());
|
|
break;
|
|
case WriterQueueEntryKind::LOGSDB_RESPONSE:
|
|
_logsDBResponses.emplace_back(request.moveLogsDBResponse());
|
|
break;
|
|
case WriterQueueEntryKind::SHARD_LOG_ENTRY:
|
|
_logEntries.emplace_back(request.moveQueuedShardLogEntry());
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (_logsDB) {
|
|
logsDBStep();
|
|
} else {
|
|
noLogsDbStep();
|
|
}
|
|
}
|
|
};
|
|
|
|
struct ShardRegisterer : PeriodicLoop {
|
|
private:
|
|
ShardShared& _shared;
|
|
Stopper _stopper;
|
|
ShardReplicaId _shrid;
|
|
std::string _shuckleHost;
|
|
uint16_t _shucklePort;
|
|
bool _hasSecondIp;
|
|
XmonNCAlert _alert;
|
|
AddrsInfo _info;
|
|
bool _infoLoaded;
|
|
bool _registerCompleted;
|
|
std::array<AddrsInfo, 5> _replicas;
|
|
public:
|
|
ShardRegisterer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) :
|
|
PeriodicLoop(logger, xmon, "registerer", {1_sec, 1, 1_mins, 0.1}),
|
|
_shared(shared),
|
|
_shrid(shrid),
|
|
_shuckleHost(options.shuckleHost),
|
|
_shucklePort(options.shucklePort),
|
|
_hasSecondIp(options.ipPorts[1].port != 0),
|
|
_infoLoaded(false),
|
|
_registerCompleted(false)
|
|
{}
|
|
|
|
virtual ~ShardRegisterer() = default;
|
|
|
|
void init() {
|
|
_env.updateAlert(_alert, "Waiting to register ourselves for the first time");
|
|
}
|
|
|
|
virtual bool periodicStep() {
|
|
if (unlikely(!_infoLoaded)) {
|
|
uint16_t port1 = _shared.ports[0].load();
|
|
uint16_t port2 = _shared.ports[1].load();
|
|
// Avoid registering with only one port, so that clients can just wait on
|
|
// the first port being ready and they always have both.
|
|
if (port1 == 0 || (_hasSecondIp && port2 == 0)) {
|
|
// shard server isn't up yet
|
|
return false;
|
|
}
|
|
uint32_t ip1 = _shared.ips[0].load();
|
|
uint32_t ip2 = _shared.ips[1].load();
|
|
|
|
uint32_t ip = htonl(ip1);
|
|
memcpy(_info.ip1.data.data(), &ip, 4);
|
|
_info.port1 = port1;
|
|
|
|
ip = htonl(ip2);
|
|
memcpy(_info.ip2.data.data(), &ip, 4);
|
|
_info.port2 = port2;
|
|
|
|
_infoLoaded = true;
|
|
}
|
|
if (likely(_registerCompleted)) {
|
|
std::array<AddrsInfo, 5> replicas;
|
|
LOG_INFO(_env, "Fetching replicas for shardId %s from shuckle", _shrid.shardId());
|
|
const auto [err, errStr] = fetchShardReplicas(_shuckleHost, _shucklePort, 10_sec, _shrid, replicas);
|
|
if (err == EINTR) { return false; }
|
|
if (err) {
|
|
_env.updateAlert(_alert, "Failed getting shard replicas from shuckle: %s", errStr);
|
|
return false;
|
|
}
|
|
if (_info != replicas[_shrid.replicaId().u8]) {
|
|
_env.updateAlert(_alert, "AddrsInfo in shuckle: %s , not matching local AddrsInfo: %s", replicas[_shrid.replicaId().u8], _info);
|
|
return false;
|
|
}
|
|
if (_replicas != replicas) {
|
|
_replicas = replicas;
|
|
auto replicaUpdatePtr = std::make_shared<std::array<ReplicaAddrsInfo, LogsDB::REPLICA_COUNT>>();
|
|
auto& replicaUpdate = *replicaUpdatePtr;
|
|
for (size_t i = 0; i < _replicas.size(); ++i) {
|
|
uint32_t ip;
|
|
memcpy(&ip, _replicas[i].ip1.data.data(), _replicas[i].ip1.data.size());
|
|
replicaUpdate[i].addrs[0].sin_family = AF_INET;
|
|
replicaUpdate[i].addrs[0].sin_addr.s_addr = ip;
|
|
memcpy(&ip, _replicas[i].ip2.data.data(), _replicas[i].ip2.data.size());
|
|
replicaUpdate[i].addrs[1].sin_family = AF_INET;
|
|
replicaUpdate[i].addrs[1].sin_addr.s_addr = ip;
|
|
|
|
replicaUpdate[i].addrs[0].sin_port = htons(_replicas[i].port1);
|
|
replicaUpdate[i].addrs[1].sin_port= htons(_replicas[i].port2);
|
|
}
|
|
std::atomic_exchange(&_shared.replicas, replicaUpdatePtr);
|
|
}
|
|
}
|
|
|
|
LOG_INFO(_env, "Registering ourselves (shard %s, %s) with shuckle", _shrid, _info);
|
|
const auto [err, errStr] = registerShardReplica(_shuckleHost, _shucklePort, 10_sec, _shrid, _shared.isLeader.load(std::memory_order_relaxed), _info);
|
|
if (err == EINTR) { return false; }
|
|
if (err) {
|
|
_env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", errStr);
|
|
return false;
|
|
}
|
|
_env.clearAlert(_alert);
|
|
|
|
if (unlikely(!_registerCompleted)){
|
|
_registerCompleted = true;
|
|
// Even though we registered successfully we want to do another loop quickly to fetch replica information
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
};
|
|
|
|
struct ShardBlockServiceUpdater : PeriodicLoop {
|
|
private:
|
|
ShardShared& _shared;
|
|
ShardReplicaId _shrid;
|
|
std::string _shuckleHost;
|
|
uint16_t _shucklePort;
|
|
XmonNCAlert _alert;
|
|
std::vector<BlockServiceInfo> _blockServices;
|
|
std::vector<BlockServiceId> _currentBlockServices;
|
|
public:
|
|
ShardBlockServiceUpdater(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared):
|
|
PeriodicLoop(logger, xmon, "bs_updater", {1_sec, 1_mins}),
|
|
_shared(shared),
|
|
_shrid(shrid),
|
|
_shuckleHost(options.shuckleHost),
|
|
_shucklePort(options.shucklePort)
|
|
{
|
|
_env.updateAlert(_alert, "Waiting to fetch block services for the first time");
|
|
}
|
|
|
|
virtual bool periodicStep() override {
|
|
LOG_INFO(_env, "about to fetch block services from %s:%s", _shuckleHost, _shucklePort);
|
|
const auto [err, errStr] = fetchBlockServices(_shuckleHost, _shucklePort, 10_sec, _shrid.shardId(), _blockServices, _currentBlockServices);
|
|
if (err == EINTR) { return false; }
|
|
if (err) {
|
|
_env.updateAlert(_alert, "could not reach shuckle: %s", errStr);
|
|
return false;
|
|
}
|
|
if (_blockServices.empty()) {
|
|
_env.updateAlert(_alert, "got no block services");
|
|
return false;
|
|
}
|
|
|
|
_shared.blockServicesCache.updateCache(_blockServices, _currentBlockServices);
|
|
|
|
LOG_DEBUG(_env, "updated block services");
|
|
|
|
_env.clearAlert(_alert);
|
|
|
|
return true;
|
|
}
|
|
};
|
|
|
|
struct ShardStatsInserter : PeriodicLoop {
|
|
private:
|
|
ShardShared& _shared;
|
|
ShardReplicaId _shrid;
|
|
std::string _shuckleHost;
|
|
uint16_t _shucklePort;
|
|
XmonNCAlert _alert;
|
|
std::vector<Stat> _stats;
|
|
public:
|
|
ShardStatsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared):
|
|
PeriodicLoop(logger, xmon, "stats", {1_mins, 1_hours}),
|
|
_shared(shared),
|
|
_shrid(shrid),
|
|
_shuckleHost(options.shuckleHost),
|
|
_shucklePort(options.shucklePort),
|
|
_alert(XmonAppType::DAYTIME)
|
|
{}
|
|
|
|
virtual ~ShardStatsInserter() = default;
|
|
|
|
virtual bool periodicStep() override {
|
|
for (ShardMessageKind kind : allShardMessageKind) {
|
|
std::ostringstream prefix;
|
|
prefix << "shard." << _shrid << "." << kind;
|
|
_shared.timings[(int)kind].toStats(prefix.str(), _stats);
|
|
_shared.errors[(int)kind].toStats(prefix.str(), _stats);
|
|
}
|
|
LOG_INFO(_env, "inserting stats");
|
|
auto [err, errStr] = insertStats(_shuckleHost, _shucklePort, 10_sec, _stats);
|
|
if (err == EINTR) { return false; }
|
|
_stats.clear();
|
|
if (err == 0) {
|
|
_env.clearAlert(_alert);
|
|
for (ShardMessageKind kind : allShardMessageKind) {
|
|
_shared.timings[(int)kind].reset();
|
|
_shared.errors[(int)kind].reset();
|
|
}
|
|
return true;
|
|
} else {
|
|
_env.updateAlert(_alert, "Could not insert stats: %s", errStr);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// TODO restore this when we have the functionality to do so
|
|
// virtual void finish() override {
|
|
// LOG_INFO(_env, "insert stats one last time");
|
|
// periodicStep();
|
|
// }
|
|
};
|
|
|
|
struct ShardMetricsInserter : PeriodicLoop {
|
|
private:
|
|
ShardShared& _shared;
|
|
ShardReplicaId _shrid;
|
|
XmonNCAlert _sendMetricsAlert;
|
|
MetricsBuilder _metricsBuilder;
|
|
std::unordered_map<std::string, uint64_t> _rocksDBStats;
|
|
std::array<XmonNCAlert, 2> _sockQueueAlerts;
|
|
XmonNCAlert _writeQueueAlert;
|
|
public:
|
|
ShardMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardReplicaId shrid, ShardShared& shared):
|
|
PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}),
|
|
_shared(shared),
|
|
_shrid(shrid),
|
|
_sendMetricsAlert(XmonAppType::DAYTIME),
|
|
_sockQueueAlerts({XmonAppType::NEVER, XmonAppType::NEVER}),
|
|
_writeQueueAlert(XmonAppType::NEVER)
|
|
{}
|
|
|
|
virtual ~ShardMetricsInserter() = default;
|
|
|
|
virtual bool periodicStep() {
|
|
_shared.sharedDB.dumpRocksDBStatistics();
|
|
for (int i = 0; i < 2; i++) {
|
|
if (std::ceil(_shared.receivedRequests[i]) >= MAX_RECV_MSGS) {
|
|
_env.updateAlert(_sockQueueAlerts[i], "recv queue for sock %s is full (%s)", i, _shared.receivedRequests[i]);
|
|
} else {
|
|
_env.clearAlert(_sockQueueAlerts[i]);
|
|
}
|
|
}
|
|
if (std::ceil(_shared.logEntriesQueueSize) >= LOG_ENTRIES_QUEUE_SIZE) {
|
|
_env.updateAlert(_writeQueueAlert, "write queue is full (%s)", _shared.logEntriesQueueSize);
|
|
} else {
|
|
_env.clearAlert(_writeQueueAlert);
|
|
}
|
|
auto now = eggsNow();
|
|
for (ShardMessageKind kind : allShardMessageKind) {
|
|
const ErrorCount& errs = _shared.errors[(int)kind];
|
|
for (int i = 0; i < errs.count.size(); i++) {
|
|
uint64_t count = errs.count[i].load();
|
|
if (count == 0) { continue; }
|
|
_metricsBuilder.measurement("eggsfs_shard_requests");
|
|
_metricsBuilder.tag("shard", _shrid);
|
|
_metricsBuilder.tag("kind", kind);
|
|
_metricsBuilder.tag("write", !readOnlyShardReq(kind));
|
|
if (i == 0) {
|
|
_metricsBuilder.tag("error", "NO_ERROR");
|
|
} else {
|
|
_metricsBuilder.tag("error", (EggsError)i);
|
|
}
|
|
_metricsBuilder.fieldU64("count", count);
|
|
_metricsBuilder.timestamp(now);
|
|
}
|
|
}
|
|
{
|
|
_metricsBuilder.measurement("eggsfs_shard_write_queue");
|
|
_metricsBuilder.tag("shard", _shrid);
|
|
_metricsBuilder.fieldFloat("size", _shared.logEntriesQueueSize);
|
|
_metricsBuilder.timestamp(now);
|
|
}
|
|
for (int i = 0; i < _shared.receivedRequests.size(); i++) {
|
|
_metricsBuilder.measurement("eggsfs_shard_received_requests");
|
|
_metricsBuilder.tag("shard", _shrid);
|
|
_metricsBuilder.tag("socket", i);
|
|
_metricsBuilder.fieldFloat("count", _shared.receivedRequests[i]);
|
|
_metricsBuilder.timestamp(now);
|
|
}
|
|
{
|
|
_metricsBuilder.measurement("eggsfs_shard_pulled_write_requests");
|
|
_metricsBuilder.tag("shard", _shrid);
|
|
_metricsBuilder.fieldFloat("count", _shared.pulledWriteRequests);
|
|
_metricsBuilder.timestamp(now);
|
|
}
|
|
{
|
|
_rocksDBStats.clear();
|
|
_shared.sharedDB.rocksDBMetrics(_rocksDBStats);
|
|
for (const auto& [name, value]: _rocksDBStats) {
|
|
_metricsBuilder.measurement("eggsfs_shard_rocksdb");
|
|
_metricsBuilder.tag("shard", _shrid);
|
|
_metricsBuilder.fieldU64(name, value);
|
|
_metricsBuilder.timestamp(now);
|
|
}
|
|
}
|
|
std::string err = sendMetrics(10_sec, _metricsBuilder.payload());
|
|
_metricsBuilder.reset();
|
|
if (err.empty()) {
|
|
LOG_INFO(_env, "Sent metrics to influxdb");
|
|
_env.clearAlert(_sendMetricsAlert);
|
|
return true;
|
|
} else {
|
|
_env.updateAlert(_sendMetricsAlert, "Could not insert metrics: %s", err);
|
|
return false;
|
|
}
|
|
}
|
|
};
|
|
|
|
void runShard(ShardReplicaId shrid, const std::string& dbDir, ShardOptions& options) {
|
|
int logOutFd = STDOUT_FILENO;
|
|
if (!options.logFile.empty()) {
|
|
logOutFd = open(options.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644);
|
|
if (logOutFd < 0) {
|
|
throw SYSCALL_EXCEPTION("open");
|
|
}
|
|
}
|
|
Logger logger(options.logLevel, logOutFd, options.syslog, true);
|
|
|
|
std::shared_ptr<XmonAgent> xmon;
|
|
if (options.xmon) {
|
|
xmon = std::make_shared<XmonAgent>();
|
|
}
|
|
|
|
Env env(logger, xmon, "startup");
|
|
|
|
{
|
|
LOG_INFO(env, "Running shard %s with options:", shrid);
|
|
LOG_INFO(env, " level = %s", options.logLevel);
|
|
LOG_INFO(env, " logFile = '%s'", options.logFile);
|
|
LOG_INFO(env, " shuckleHost = '%s'", options.shuckleHost);
|
|
LOG_INFO(env, " shucklePort = %s", options.shucklePort);
|
|
for (int i = 0; i < 2; i++) {
|
|
LOG_INFO(env, " port%s = %s", i+1, options.ipPorts[0].port);
|
|
{
|
|
char ip[INET_ADDRSTRLEN];
|
|
uint32_t ipN = options.ipPorts[i].ip;
|
|
LOG_INFO(env, " ownIp%s = %s", i+1, inet_ntop(AF_INET, &ipN, ip, INET_ADDRSTRLEN));
|
|
}
|
|
}
|
|
LOG_INFO(env, " simulateIncomingPacketDrop = %s", options.simulateIncomingPacketDrop);
|
|
LOG_INFO(env, " simulateOutgoingPacketDrop = %s", options.simulateOutgoingPacketDrop);
|
|
LOG_INFO(env, " syslog = %s", (int)options.syslog);
|
|
if (options.writeToLogsDB) {
|
|
LOG_INFO(env, "Using LogsDB with options:");
|
|
LOG_INFO(env, " dontDoReplication = '%s'", (int)options.dontDoReplication);
|
|
LOG_INFO(env, " forceLeader = '%s'", (int)options.forceLeader);
|
|
LOG_INFO(env, " avoidBeingLeader = '%s'", (int)options.avoidBeingLeader);
|
|
LOG_INFO(env, " forcedLastReleased = '%s'", options.forcedLastReleased);
|
|
}
|
|
}
|
|
|
|
// Immediately start xmon: we want the database initializing update to
|
|
// be there.
|
|
std::vector<std::unique_ptr<LoopThread>> threads;
|
|
|
|
if (xmon) {
|
|
XmonConfig config;
|
|
{
|
|
std::ostringstream ss;
|
|
ss << shrid;
|
|
config.appInstance = "eggsshard" + ss.str();
|
|
}
|
|
config.prod = options.xmonProd;
|
|
config.appType = XmonAppType::CRITICAL;
|
|
threads.emplace_back(LoopThread::Spawn(std::make_unique<Xmon>(logger, xmon, config)));
|
|
}
|
|
|
|
// then everything else
|
|
|
|
XmonNCAlert dbInitAlert;
|
|
env.updateAlert(dbInitAlert, "initializing database");
|
|
|
|
SharedRocksDB sharedDB(logger, xmon);
|
|
sharedDB.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
|
|
sharedDB.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
|
|
sharedDB.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors());
|
|
rocksdb::Options rocksDBOptions;
|
|
rocksDBOptions.create_if_missing = true;
|
|
rocksDBOptions.create_missing_column_families = true;
|
|
rocksDBOptions.compression = rocksdb::kLZ4Compression;
|
|
rocksDBOptions.bottommost_compression = rocksdb::kZSTD;
|
|
// 1000*256 = 256k open files at once, given that we currently run on a
|
|
// single machine this is appropriate.
|
|
rocksDBOptions.max_open_files = 1000;
|
|
// We batch writes and flush manually.
|
|
rocksDBOptions.manual_wal_flush = true;
|
|
sharedDB.open(rocksDBOptions, dbDir);
|
|
|
|
BlockServicesCacheDB blockServicesCache(logger, xmon, sharedDB);
|
|
|
|
ShardDB shardDB(logger, xmon, shrid.shardId(), options.transientDeadlineInterval, sharedDB, blockServicesCache);
|
|
env.clearAlert(dbInitAlert);
|
|
|
|
if (options.writeToLogsDB && options.dontDoReplication) {
|
|
options.forcedLastReleased.u64 = shardDB.lastAppliedLogEntry();
|
|
}
|
|
|
|
ShardShared shared(sharedDB, blockServicesCache, shardDB);
|
|
|
|
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardServer>(logger, xmon, shrid, options, shared)));
|
|
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardWriter>(logger, xmon, shrid, options, shared)));
|
|
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardRegisterer>(logger, xmon, shrid, options, shared)));
|
|
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardBlockServiceUpdater>(logger, xmon, shrid, options, shared)));
|
|
if (options.shuckleStats) {
|
|
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardStatsInserter>(logger, xmon, shrid, options, shared)));
|
|
}
|
|
if (options.metrics) {
|
|
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardMetricsInserter>(logger, xmon, shrid, shared)));
|
|
}
|
|
|
|
// from this point on termination on SIGINT/SIGTERM will be graceful
|
|
LoopThread::waitUntilStopped(threads);
|
|
threads.clear();
|
|
|
|
shardDB.close();
|
|
sharedDB.close();
|
|
|
|
LOG_INFO(env, "shard terminating gracefully, bye.");
|
|
}
|