Files
ternfs-XTXMarkets/cpp/cdc/CDCDB.hpp
Francesco Mazzoli 8075e99bb6 Graceful shard teardown
See <https://mazzo.li/posts/stopping-linux-threads.html> for tradeoffs
regarding how to terminate threads gracefully.

The goal of this work was for valgrind to work correctly, which in turn
was to investigate #141. It looks like I have succeeded:

    ==2715080== Warning: unimplemented fcntl command: 1036
    ==2715080== 20,052 bytes in 5,013 blocks are definitely lost in loss record 133 of 135
    ==2715080==    at 0x483F013: operator new(unsigned long) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
    ==2715080==    by 0x3B708E: allocate (new_allocator.h:121)
    ==2715080==    by 0x3B708E: allocate (allocator.h:173)
    ==2715080==    by 0x3B708E: allocate (alloc_traits.h:460)
    ==2715080==    by 0x3B708E: _M_allocate (stl_vector.h:346)
    ==2715080==    by 0x3B708E: std::vector<Crc, std::allocator<Crc> >::_M_default_append(unsigned long) (vector.tcc:635)
    ==2715080==    by 0x42BF1C: resize (stl_vector.h:940)
    ==2715080==    by 0x42BF1C: ShardDBImpl::_fileSpans(rocksdb::ReadOptions&, FileSpansReq const&, FileSpansResp&) (shard/ShardDB.cpp:921)
    ==2715080==    by 0x420867: ShardDBImpl::read(ShardReqContainer const&, ShardRespContainer&) (shard/ShardDB.cpp:1034)
    ==2715080==    by 0x3CB3EE: ShardServer::_handleRequest(int, sockaddr_in*, char*, unsigned long) (shard/Shard.cpp:347)
    ==2715080==    by 0x3C8A39: ShardServer::step() (shard/Shard.cpp:405)
    ==2715080==    by 0x40B1E8: run (core/Loop.cpp:67)
    ==2715080==    by 0x40B1E8: startLoop(void*) (core/Loop.cpp:37)
    ==2715080==    by 0x4BEA258: start_thread (in /usr/lib/libpthread-2.33.so)
    ==2715080==    by 0x4D005E2: clone (in /usr/lib/libc-2.33.so)
    ==2715080==
    ==2715080==
    ==2715080== Exit program on first error (--exit-on-first-error=yes)
2024-01-08 15:41:22 +00:00

128 lines
3.4 KiB
C++

#pragma once
#include <unordered_map>
#include "Bincode.hpp"
#include "Msgs.hpp"
#include "Env.hpp"
#include "Shuckle.hpp"
// This exists purely for type safety
struct CDCTxnId {
uint64_t x;
CDCTxnId() : x(0) {} // txn ids are never zeros, use it as a null value
CDCTxnId(uint64_t x_) : x(x_) {}
bool operator==(const CDCTxnId rhs) const {
return x == rhs.x;
}
bool operator!=(const CDCTxnId rhs) const {
return x != rhs.x;
}
};
std::ostream& operator<<(std::ostream& out, CDCTxnId id);
template <>
struct std::hash<CDCTxnId> {
std::size_t operator()(const CDCTxnId key) const {
return std::hash<uint64_t>{}(key.x);
}
};
struct CDCShardReq {
ShardId shid;
bool repeated; // This request is exactly the same as the previous one.
ShardReqContainer req;
void clear() {
shid = ShardId(0);
repeated = false;
req.clear();
}
};
std::ostream& operator<<(std::ostream& out, const CDCShardReq& x);
struct CDCFinished {
// If err is not NO_ERROR, resp is not filled in
EggsError err;
CDCRespContainer resp;
};
std::ostream& operator<<(std::ostream& out, const CDCFinished& x);
struct CDCStep {
std::vector<std::pair<CDCTxnId, CDCFinished>> finishedTxns; // txns which have finished
std::vector<std::pair<CDCTxnId, CDCShardReq>> runningTxns; // txns which need a new shard request
void clear() {
finishedTxns.clear();
runningTxns.clear();
}
};
std::ostream& operator<<(std::ostream& out, const CDCStep& x);
struct CDCShardResp {
CDCTxnId txnId; // the transaction id we're getting a response for
// if err != NO_ERROR, resp is not filled in
EggsError err;
ShardRespContainer resp;
};
std::ostream& operator<<(std::ostream& out, const CDCShardResp& x);
struct CDCDB {
private:
void* _impl;
public:
CDCDB() = delete;
CDCDB& operator=(const CDCDB&) = delete;
CDCDB(Logger& env, std::shared_ptr<XmonAgent>& xmon, const std::string& path);
~CDCDB();
void close();
// Unlike with ShardDB, we don't have an explicit log preparation step here,
// because at least for now logs are simply either CDC requests, or shard
// responses.
//
// The functions below cannot be called concurrently.
// Gives you what to do when the CDC is started back up. It'll basically just
// tell you to send some requests to shards. You need to run this when starting
// up before proceeding.
void bootstrap(
bool sync,
uint64_t logIndex,
CDCStep& step
);
// Enqueues some CDC requests, and immediately starts it if possible.
// Returns the txn id that was assigned to each request.
//
// Also, advances the CDC state using some shard responses.
//
// This function crashes hard if the caller passes it a response it's not
// expecting. So the caller should track responses and make sure only relevant
// ones are passed in.
void update(
bool sync,
uint64_t logIndex,
const std::vector<CDCReqContainer>& cdcReqs,
const std::vector<CDCShardResp>& shardResps,
CDCStep& step,
// Output txn ids for all the requests, same length as `cdcReqs`.
std::vector<CDCTxnId>& cdcReqsTxnIds
);
// The index of the last log entry persisted to the DB
uint64_t lastAppliedLogEntry();
void rocksDBMetrics(std::unordered_map<std::string, uint64_t>& values);
void dumpRocksDBStatistics();
};