shard: separate read thread

This commit is contained in:
Miroslav Crnic
2024-11-06 08:58:44 +00:00
committed by GitHub Enterprise
parent 3427d89ae9
commit edd5a37a82

View File

@@ -178,6 +178,15 @@ private:
std::variant<LogsDBRequest, LogsDBResponse, QueuedShardLogEntry, SnapshotRequest> _data;
};
struct ShardReaderRequest {
ShardReqMsg msg;
uint64_t requestId;
EggsTime receivedAt;
IpPort clientAddr;
int sockIx; // which sock to use to reply
bool checkPointed;
};
struct ShardShared {
const ShardOptions& options;
SharedRocksDB& sharedDB;
@@ -189,8 +198,11 @@ struct ShardShared {
std::array<ErrorCount, maxShardMessageKind+1> errors;
SPSC<ShardWriterRequest> writerRequestsQueue;
std::atomic<double> logEntriesQueueSize;
SPSC<ShardReaderRequest> readerRequestsQueue;
std::atomic<double> readerRequestQueueSize;
std::array<std::atomic<double>, 2> receivedRequests; // how many requests we got at once from each socket
std::atomic<double> pulledWriteRequests; // how many requests we got from write queue
std::atomic<double> pulledReadRequests; // how many requests we got from read queue
std::shared_ptr<std::array<AddrsInfo, LogsDB::REPLICA_COUNT>> replicas;
std::atomic<bool> isLeader;
@@ -204,6 +216,8 @@ struct ShardShared {
socks({std::move(sock)}),
writerRequestsQueue(LOG_ENTRIES_QUEUE_SIZE),
logEntriesQueueSize(0),
readerRequestsQueue(LOG_ENTRIES_QUEUE_SIZE),
readerRequestQueueSize(0),
pulledWriteRequests(0)
{
for (ShardMessageKind kind : allShardMessageKind) {
@@ -350,6 +364,9 @@ private:
// log entries buffers
std::vector<ShardWriterRequest> _logEntries;
// read requests buffer
std::vector<ShardReaderRequest> _readRequests;
std::unique_ptr<UDPReceiver<1>> _receiver;
std::unique_ptr<UDPSender> _sender;
std::unique_ptr<MultiplexedChannel<4, std::array<uint32_t, 4>{SHARD_REQ_PROTOCOL_VERSION, SHARD_CHECK_POINTED_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}>> _channel;
@@ -482,14 +499,14 @@ private:
LOG_DEBUG(_env, "parsed request: %s", req);
}
// At this point, if it's a read request, we can process it,
// if it's a write request we prepare the log entry and
// send it off.
ShardRespMsg respContainer;
ShardCheckPointedRespMsg checkpointedRespContainer;
checkpointedRespContainer.id = respContainer.id = req.id;
if (readOnlyShardReq(req.body.kind())) {
checkpointedRespContainer.body.checkPointIdx = _shared.shardDB.read(req.body, checkPointed ? checkpointedRespContainer.body.resp : respContainer.body);
auto& readerRequest = _readRequests.emplace_back();
readerRequest.requestId = req.id;
readerRequest.clientAddr = msg.clientAddr;
readerRequest.sockIx = msg.socketIx;
readerRequest.receivedAt = t0;
readerRequest.msg = std::move(req);
readerRequest.checkPointed = checkPointed;
} else if (unlikely(req.body.kind() == ShardMessageKind::SHARD_SNAPSHOT)) {
auto& entry = _logEntries.emplace_back().setSnapshotRequest();
entry.sockIx = msg.socketIx;
@@ -497,7 +514,6 @@ private:
entry.receivedAt = t0;
entry.snapshotId = req.body.getShardSnapshot().snapshotId;
entry.requestId = req.id;
return;
} else {
auto& entry = _logEntries.emplace_back().setQueuedShardLogEntry();
entry.sockIx = msg.socketIx;
@@ -507,24 +523,24 @@ private:
entry.requestId = req.id;
entry.checkPointed = checkPointed;
auto err = _shared.shardDB.prepareLogEntry(req.body, entry.logEntry);
if (likely(err == EggsError::NO_ERROR)) {
return; // we're done here, move along
} else {
LOG_ERROR(_env, "error preparing log entry for request: %s from: %s err: %s", req, entry.clientAddr, err);
(checkPointed ? checkpointedRespContainer.body.resp : respContainer.body).setError() = err;
if (unlikely(err != EggsError::NO_ERROR)) {
_logEntries.pop_back(); // back out the log entry
LOG_ERROR(_env, "error preparing log entry for request: %s from: %s err: %s", req, entry.clientAddr, err);
Duration elapsed = eggsNow() - t0;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
if (checkPointed) {
ShardCheckPointedRespMsg checkpointedRespContainer;
checkpointedRespContainer.id = req.id;
checkpointedRespContainer.body.resp.setError() = err;
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), checkpointedRespContainer, _expandedCDCKey);
} else {
ShardRespMsg respContainer;
respContainer.id = req.id;
respContainer.body.setError() = err;
packShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), respContainer);
}
}
}
Duration elapsed = eggsNow() - t0;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
if (checkPointed) {
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), checkpointedRespContainer, _expandedCDCKey);
} else {
packShardResponse(_env, _shared, _shared.sock().addr(), *_sender, elapsed, dropArtificially, msg.clientAddr, msg.socketIx, req.body.kind(), respContainer);
}
}
public:
@@ -535,6 +551,7 @@ public:
}
_logEntries.clear();
_readRequests.clear();
if (unlikely(!_channel->receiveMessages(_env, _shared.socks, *_receiver))) {
return;
@@ -574,7 +591,19 @@ public:
}
}
}
// write out read responses to UDP
// write out read requests to queue
{
size_t numReadRequests = _readRequests.size();
if (numReadRequests > 0) {
LOG_DEBUG(_env, "pushing %s read requests to reader", numReadRequests);
uint32_t pushed = _shared.readerRequestsQueue.push(_readRequests);
_shared.readerRequestQueueSize = _shared.readerRequestQueueSize*0.95 + _shared.readerRequestsQueue.size()*0.05;
if (pushed < numReadRequests) {
LOG_INFO(_env, "tried to push %s elements to reader queue, but pushed %s instead", numReadRequests, pushed);
}
}
}
// write out errors to UDP socket
_sender->sendMessages(_env, _shared.sock());
}
};
@@ -888,6 +917,80 @@ public:
}
};
struct ShardReader : Loop {
private:
ShardShared& _shared;
AES128Key _expandedShardKey;
AES128Key _expandedCDCKey;
ShardRespMsg _respContainer;
ShardCheckPointedRespMsg _checkPointedrespContainer;
std::vector<ShardReaderRequest> _requests;
UDPSender _sender;
uint64_t _packetDropRand;
uint64_t _outgoingPacketDropProbability; // probability * 10,000
virtual void sendStop() override {
_shared.readerRequestsQueue.close();
}
public:
ShardReader(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardShared& shared) :
Loop(logger, xmon, "reader"),
_shared(shared),
_sender(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}),
_packetDropRand(eggsNow().ns),
_outgoingPacketDropProbability(0)
{
expandKey(ShardKey, _expandedShardKey);
expandKey(CDCKey, _expandedCDCKey);
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
if (prob != 0.0) {
LOG_INFO(_env, "Will drop %s%% of %s packets", prob*100.0, what);
iprob = prob * 10'000.0;
ALWAYS_ASSERT(iprob > 0 && iprob < 10'000);
}
};
convertProb("outgoing", _shared.options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability);
_requests.reserve(MAX_RECV_MSGS * 2);
}
virtual ~ShardReader() = default;
virtual void step() override {
_requests.clear();
uint32_t pulled = _shared.readerRequestsQueue.pull(_requests, MAX_RECV_MSGS * 2);
auto start = eggsNow();
if (likely(pulled > 0)) {
LOG_DEBUG(_env, "pulled %s requests from read queue", pulled);
_shared.pulledReadRequests = _shared.pulledReadRequests*0.95 + ((double)pulled)*0.05;
}
if (unlikely(_shared.readerRequestsQueue.isClosed())) {
// queue is closed, stop
stop();
return;
}
ShardRespMsg respContainer;
ShardCheckPointedRespMsg checkpointedRespContainer;
for(auto& req : _requests) {
checkpointedRespContainer.id = respContainer.id = req.requestId;
ALWAYS_ASSERT(readOnlyShardReq(req.msg.body.kind()));
checkpointedRespContainer.body.checkPointIdx = _shared.shardDB.read(req.msg.body, req.checkPointed ? checkpointedRespContainer.body.resp : respContainer.body);
Duration elapsed = eggsNow() - req.receivedAt;
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
if (req.checkPointed) {
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, req.clientAddr, req.sockIx, req.msg.body.kind(), checkpointedRespContainer, _expandedCDCKey);
} else {
packShardResponse(_env, _shared, _shared.sock().addr(), _sender, elapsed, dropArtificially, req.clientAddr, req.sockIx, req.msg.body.kind(), respContainer);
}
}
_sender.sendMessages(_env, _shared.sock());
}
};
struct ShardRegisterer : PeriodicLoop {
private:
ShardShared& _shared;
@@ -1185,6 +1288,12 @@ public:
_metricsBuilder.fieldFloat("size", _shared.logEntriesQueueSize);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_read_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.fieldFloat("size", _shared.readerRequestQueueSize);
_metricsBuilder.timestamp(now);
}
for (int i = 0; i < _shared.receivedRequests.size(); i++) {
_metricsBuilder.measurement("eggsfs_shard_received_requests");
_metricsBuilder.tag("shard", _shrid);
@@ -1198,6 +1307,12 @@ public:
_metricsBuilder.fieldFloat("count", _shared.pulledWriteRequests);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_pulled_read_requests");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.fieldFloat("count", _shared.pulledReadRequests);
_metricsBuilder.timestamp(now);
}
{
_rocksDBStats.clear();
_shared.sharedDB.rocksDBMetrics(_rocksDBStats);
@@ -1299,6 +1414,7 @@ void runShard(ShardOptions& options) {
ShardShared shared(options, sharedDB, blockServicesCache, shardDB, logsDB, UDPSocketPair(env, options.shardAddrs));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardServer>(logger, xmon, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardReader>(logger, xmon, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardWriter>(logger, xmon, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardRegisterer>(logger, xmon, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardBlockServiceUpdater>(logger, xmon, shared)));