shard: allow initiating spans at non default locations

This commit is contained in:
Miroslav Crnic
2024-08-23 09:13:52 +00:00
parent d277bc40e5
commit 452ff75ea0
11 changed files with 323 additions and 85 deletions
+8 -8
View File
@@ -121,7 +121,7 @@ static std::pair<int, std::string> readShuckleResponse(int fd, ShuckleRespContai
return {};
}
std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t port, Duration timeout, ShardId shid, std::vector<BlockServiceInfo>& blockServices, std::vector<BlockServiceId>& currentBlockServices) {
std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t port, Duration timeout, ShardId shid, std::vector<BlockServiceInfo>& blockServices, std::vector<BlockServiceInfoShort>& currentBlockServices) {
blockServices.clear();
currentBlockServices.clear();
@@ -153,7 +153,7 @@ std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t
// current block services
{
ShuckleReqContainer reqContainer;
auto& req = reqContainer.setShardBlockServicesDEPRECATED();
auto& req = reqContainer.setShardBlockServices();
req.shardId = shid;
{
const auto [err, errStr] = writeShuckleRequest(sock.get(), reqContainer, timeout);
@@ -166,7 +166,7 @@ std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t
if (err) { FAIL(err, errStr); }
}
currentBlockServices = respContainer.getShardBlockServicesDEPRECATED().blockServices.els;
currentBlockServices = respContainer.getShardBlockServices().blockServices.els;
}
// check that all current block services are known -- there's a small race here
@@ -184,14 +184,14 @@ std::pair<int, std::string> fetchBlockServices(const std::string& addr, uint16_t
for (auto storageClass : {HDD_STORAGE, FLASH_STORAGE}) {
fdSet.clear();
for (BlockServiceId bsId : currentBlockServices) {
if (bsIdToBlockService[bsId.u64]->storageClass != storageClass) { continue; }
if (!knownBlockServices.contains(bsId.u64)) {
for (BlockServiceInfoShort bs : currentBlockServices) {
if (bs.storageClass != storageClass) { continue; }
if (!knownBlockServices.contains(bs.id.u64)) {
std::stringstream ss;
ss << "got unknown block service " << bsId << " in current block services, was probably added in the meantime, please retry";
ss << "got unknown block service " << bs.id << " in current block services, was probably added in the meantime, please retry";
FAIL(EIO, ss.str());
}
auto fdName = std::string((const char*)bsIdToBlockService[bsId.u64]->failureDomain.name.data.data(), bsIdToBlockService[bsId.u64]->failureDomain.name.data.size());
auto fdName = std::string((const char*)bs.failureDomain.name.data.data(), bs.failureDomain.name.data.size());
if (!fdSet.insert(fdName).second) {
std::stringstream ss;
ss << "got multiple block services in the same failure domain: " << fdName;
+1 -1
View File
@@ -20,7 +20,7 @@ std::pair<int, std::string> fetchBlockServices(
Duration timeout,
ShardId shid,
std::vector<BlockServiceInfo>& blockServices,
std::vector<BlockServiceId>& currentBlockServices
std::vector<BlockServiceInfoShort>& currentBlockServices
);
std::pair<int, std::string> registerShard(
+97 -30
View File
@@ -1,6 +1,10 @@
#include <rocksdb/db.h>
#include <vector>
#include "Bincode.hpp"
#include "BlockServicesCacheDB.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "RocksDBUtils.hpp"
enum class BlockServicesCacheKey : uint8_t {
@@ -16,41 +20,94 @@ inline rocksdb::Slice blockServicesCacheKey(const BlockServicesCacheKey* k) {
return rocksdb::Slice((const char*)k, sizeof(*k));
}
struct CurrentBlockServicesBody {
struct BlockServiceInfoShortBody {
FIELDS(
LE, uint8_t, length, setLength,
LE, uint64_t, blockServiceId, setBlockServiceId,
LE, uint8_t, locationId, setLocationId,
LE, uint8_t, storageClass, setStorageClass,
EMIT_OFFSET, MIN_SIZE,
END
)
void checkSize(size_t sz) {
ALWAYS_ASSERT(sz >= MIN_SIZE, "sz < MIN_SIZE (%s < %s)", sz, MIN_SIZE);
ALWAYS_ASSERT(sz == size(), "sz != size() (%s, %s)", sz, size());
BlockServiceInfoShortBody(char* data): _data(data) {}
static size_t calcSize(const BlockServiceInfoShort& blockServiceInfo) {
return SIZE;
}
static size_t calcSize(uint64_t numBlockServices) {
ALWAYS_ASSERT(numBlockServices < 256);
return MIN_SIZE + numBlockServices*sizeof(uint64_t);
void afterAlloc(const BlockServiceInfoShort& blockServiceInfo) {
setBlockServiceId(blockServiceInfo.id.u64);
setLocationId(blockServiceInfo.locationId);
setStorageClass(blockServiceInfo.storageClass);
BincodeBuf buf(_data + MIN_SIZE, FailureDomain::STATIC_SIZE);
blockServiceInfo.failureDomain.pack(buf);
}
void afterAlloc(uint64_t numBlockServices) {
setLength(numBlockServices);
FailureDomain failureDomain() const {
BincodeBuf buf(_data + MIN_SIZE, FailureDomain::STATIC_SIZE);
FailureDomain fd;
fd.unpack(buf);
return fd;
}
size_t size() const {
return MIN_SIZE + length()*sizeof(uint64_t);
return SIZE;
}
static constexpr size_t SIZE = MIN_SIZE + FailureDomain::STATIC_SIZE;
};
struct CurrentBlockServicesBody {
FIELDS(
// if true (>0) then we have body in old version which is just []BlockServiceId
// otherwise it's new version which is length followed by []BlockServiceInfoShortBody
LE, uint8_t, oldVersion, _setOldVersion,
EMIT_OFFSET, MIN_SIZE_V0,
LE, uint8_t, _length, _setLength,
EMIT_OFFSET, MIN_SIZE_V1,
END
)
uint8_t length() const {
return oldVersion() ? oldVersion() : _length();
}
uint64_t at(uint64_t ix) const {
void checkSize(size_t sz) {
ALWAYS_ASSERT(sz >= MIN_SIZE_V0, "sz < MIN_SIZE (%s < %s)", sz, MIN_SIZE_V0);
ALWAYS_ASSERT(sz == size(), "sz != size() (%s, %s)", sz, size());
}
static size_t calcSize(const std::vector<BlockServiceInfoShort>& blockServices) {
ALWAYS_ASSERT(blockServices.size() < 256);
return MIN_SIZE_V1 + blockServices.size()*BlockServiceInfoShortBody::SIZE;
}
void afterAlloc(const std::vector<BlockServiceInfoShort>& blockServices) {
_setOldVersion(false);
_setLength(blockServices.size());
for(size_t i = 0; i < blockServices.size(); ++i) {
blockServiceInfoAt(i).afterAlloc(blockServices[i]);
}
}
size_t size() const {
if (oldVersion()) {
return MIN_SIZE_V0 + length()*sizeof(uint64_t);
}
return MIN_SIZE_V1 + length() * BlockServiceInfoShortBody::SIZE;
}
uint64_t blockIdAt(uint64_t ix) const {
ALWAYS_ASSERT(oldVersion());
ALWAYS_ASSERT(ix < length());
uint64_t v;
memcpy(&v, _data + MIN_SIZE + (ix*sizeof(uint64_t)), sizeof(uint64_t));
memcpy(&v, _data + MIN_SIZE_V0 + (ix*sizeof(uint64_t)), sizeof(uint64_t));
return v;
}
void set(uint64_t ix, uint64_t v) {
BlockServiceInfoShortBody blockServiceInfoAt(uint64_t ix) const {
ALWAYS_ASSERT(!oldVersion());
ALWAYS_ASSERT(ix < length());
memcpy(_data + MIN_SIZE + (ix*sizeof(uint64_t)), &v, sizeof(uint64_t));
return BlockServiceInfoShortBody(_data + MIN_SIZE_V1 + ix *BlockServiceInfoShortBody::SIZE);
}
};
@@ -128,18 +185,11 @@ BlockServicesCacheDB::BlockServicesCacheDB(Logger& logger, std::shared_ptr<XmonA
if (!keyExists(_blockServicesCF, blockServicesCacheKey(&CURRENT_BLOCK_SERVICES_KEY))) {
LOG_INFO(_env, "initializing current block services (as empty)");
OwnedValue<CurrentBlockServicesBody> v(0);
v().setLength(0);
OwnedValue<CurrentBlockServicesBody> v(std::vector<BlockServiceInfoShort>{});
ROCKS_DB_CHECKED(_db->Put({}, _blockServicesCF, blockServicesCacheKey(&CURRENT_BLOCK_SERVICES_KEY), v.toSlice()));
} else {
LOG_INFO(_env, "initializing block services cache (from db)");
std::string buf;
ROCKS_DB_CHECKED(_db->Get({}, _blockServicesCF, blockServicesCacheKey(&CURRENT_BLOCK_SERVICES_KEY), &buf));
ExternalValue<CurrentBlockServicesBody> v(buf);
_currentBlockServices.resize(v().length());
for (int i = 0; i < v().length(); i++) {
_currentBlockServices[i] = v().at(i);
}
{
rocksdb::ReadOptions options;
static_assert(sizeof(BlockServicesCacheKey) == sizeof(uint8_t));
@@ -166,6 +216,25 @@ BlockServicesCacheDB::BlockServicesCacheDB(Logger& logger, std::shared_ptr<XmonA
}
ROCKS_DB_CHECKED(it->status());
}
ROCKS_DB_CHECKED(_db->Get({}, _blockServicesCF, blockServicesCacheKey(&CURRENT_BLOCK_SERVICES_KEY), &buf));
ExternalValue<CurrentBlockServicesBody> v(buf);
_currentBlockServices.resize(v().length());
for (int i = 0; i < v().length(); i++) {
auto& current = _currentBlockServices[i];
if (v().oldVersion()) {
current.id = v().blockIdAt(i);
auto& blockServiceInfo = _blockServices[current.locationId];
current.locationId = DEFAULT_LOCATION;
current.failureDomain.name = blockServiceInfo.failureDomain;
current.storageClass = blockServiceInfo.storageClass;
} else {
auto blockServiceInfo = v().blockServiceInfoAt(i);
current.id = blockServiceInfo.blockServiceId();
current.locationId = blockServiceInfo.locationId();
current.storageClass = blockServiceInfo.storageClass();
current.failureDomain = blockServiceInfo.failureDomain();
}
}
_haveBlockServices = true;
}
}
@@ -174,7 +243,7 @@ BlockServicesCache BlockServicesCacheDB::getCache() const {
return BlockServicesCache(_mutex, _blockServices, _currentBlockServices);
}
void BlockServicesCacheDB::updateCache(const std::vector<BlockServiceInfo>& blockServices, const std::vector<BlockServiceId>& currentBlockServices) {
void BlockServicesCacheDB::updateCache(const std::vector<BlockServiceInfo>& blockServices, const std::vector<BlockServiceInfoShort>& currentBlockServices) {
LOG_INFO(_env, "Updating block service cache");
std::unique_lock _(_mutex);
@@ -208,13 +277,11 @@ void BlockServicesCacheDB::updateCache(const std::vector<BlockServiceInfo>& bloc
// then the current block services
ALWAYS_ASSERT(currentBlockServices.size() < 256); // TODO handle this properly
_currentBlockServices.clear();
for (BlockServiceId id: currentBlockServices) {
_currentBlockServices.emplace_back(id.u64);
}
OwnedValue<CurrentBlockServicesBody> currentBody(_currentBlockServices.size());
for (int i = 0; i < _currentBlockServices.size(); i++) {
currentBody().set(i, _currentBlockServices[i]);
for (auto& bs: currentBlockServices) {
_currentBlockServices.emplace_back(bs);
}
OwnedValue<CurrentBlockServicesBody> currentBody(_currentBlockServices);
ROCKS_DB_CHECKED(batch.Put(_blockServicesCF, blockServicesCacheKey(&CURRENT_BLOCK_SERVICES_KEY), currentBody.toSlice()));
// We intentionally do not flush here, it's not critical
+4 -4
View File
@@ -25,12 +25,12 @@ private:
std::shared_mutex& _shared_mutex;
public:
const std::unordered_map<uint64_t, BlockServiceCache>& blockServices;
const std::vector<uint64_t>& currentBlockServices;
const std::vector<BlockServiceInfoShort>& currentBlockServices;
BlockServicesCache(
std::shared_mutex& mutex,
const std::unordered_map<uint64_t, BlockServiceCache>& blockServices_,
const std::vector<uint64_t>& currentBlockServices_
const std::vector<BlockServiceInfoShort>& currentBlockServices_
) :
_shared_mutex(mutex), blockServices(blockServices_), currentBlockServices(currentBlockServices_)
{
@@ -58,13 +58,13 @@ private:
// Cache of all the block services as an in-memory map.
std::unordered_map<uint64_t, BlockServiceCache> _blockServices;
// The block services that we currently want to write to.
std::vector<uint64_t> _currentBlockServices;
std::vector<BlockServiceInfoShort> _currentBlockServices;
public:
BlockServicesCacheDB(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const SharedRocksDB& sharedDB);
static std::vector<rocksdb::ColumnFamilyDescriptor> getColumnFamilyDescriptors();
void updateCache(const std::vector<BlockServiceInfo>& blockServices, const std::vector<BlockServiceId>& currentBlockServices);
void updateCache(const std::vector<BlockServiceInfo>& blockServices, const std::vector<BlockServiceInfoShort>& currentBlockServices);
// We've seen at least one `updateCache()`, or we've loaded the
// block services from the cache.
+46 -2
View File
@@ -872,7 +872,26 @@ public:
req.second.lastSent = now;
ProxyShardReqMsg reqMsg;
reqMsg.id = req.first;
reqMsg.body = req.second.req.msg.body;
switch (req.second.req.msg.body.kind()) {
case ShardMessageKind::ADD_SPAN_INITIATE:
{
auto& addSpanReq = reqMsg.body.setAddSpanAtLocationInitiate();
addSpanReq.locationId = _shared.options.location;
addSpanReq.req.reference = NULL_INODE_ID;
addSpanReq.req.req = req.second.req.msg.body.getAddSpanInitiate();
break;
}
case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE:
{
auto& addSpanReq = reqMsg.body.setAddSpanAtLocationInitiate();
addSpanReq.locationId = _shared.options.location;
addSpanReq.req = req.second.req.msg.body.getAddSpanInitiateWithReference();
break;
}
default:
reqMsg.body = req.second.req.msg.body;
}
_sender.prepareOutgoingMessage(
_env,
_shared.sock().addr(),
@@ -983,6 +1002,31 @@ public:
resp.body = std::move(it->second);
_proxiedResponses.erase(it);
}
if (resp.body.kind() == ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE) {
ShardRespContainer tmpResp;
switch (request.msg.body.kind()) {
case ShardMessageKind::ADD_SPAN_INITIATE:
{
auto& addResp = tmpResp.setAddSpanInitiate();
addResp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks);
resp.body.setAddSpanInitiate().blocks = std::move(addResp.blocks);
break;
}
case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE:
{
auto& addResp = tmpResp.setAddSpanInitiateWithReference();
addResp.resp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks);
resp.body.setAddSpanInitiateWithReference().resp.blocks = std::move(addResp.resp.blocks);
break;
}
case ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE:
{
break;
}
default:
ALWAYS_ASSERT(false, "Unexpected reponse kind %s for requests kind %s", resp.body.kind(), request.msg.body.kind() );
}
}
packShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp);
}
break;
@@ -1712,7 +1756,7 @@ private:
uint16_t _shucklePort;
XmonNCAlert _alert;
std::vector<BlockServiceInfo> _blockServices;
std::vector<BlockServiceId> _currentBlockServices;
std::vector<BlockServiceInfoShort> _currentBlockServices;
bool _updatedOnce;
public:
ShardBlockServiceUpdater(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardShared& shared):
+73 -22
View File
@@ -21,6 +21,7 @@
#include "Bincode.hpp"
#include "BlockServicesCacheDB.hpp"
#include "Common.hpp"
#include "Msgs.hpp"
#include "MsgsGen.hpp"
#include "crc32c.h"
#include "Crypto.hpp"
@@ -906,6 +907,7 @@ struct ShardDBImpl {
location.parity = spanBlock.parity();
location.stripes = spanBlock.stripes();
location.cellSize = spanBlock.cellSize();
location.storageClass = spanBlock.storageClass();
location.blocks.els.resize(spanBlock.parity().blocks());
for (int j = 0; j < spanBlock.parity().blocks(); j++) {
auto block = spanBlock.block(j);
@@ -1036,6 +1038,7 @@ struct ShardDBImpl {
break;
case ShardMessageKind::FILE_SPANS:
err = _fileSpans(options, req.getFileSpans(), resp.setFileSpans());
break;
case ShardMessageKind::BLOCK_SERVICE_FILES:
err = _blockServiceFiles(options, req.getBlockServiceFiles(), resp.setBlockServiceFiles());
break;
@@ -1353,12 +1356,11 @@ struct ShardDBImpl {
bool _blockServiceMatchesBlacklist(
const std::vector<BlacklistEntry>& blacklists,
const std::array<uint8_t, 16>& failureDomain,
BlockServiceId blockServiceId,
const BlockServiceCache& cache
const FailureDomain& failureDomain,
BlockServiceId blockServiceId
) {
for (const auto& blacklist: blacklists) {
if (blacklist.failureDomain.name.data == failureDomain || blacklist.blockService == blockServiceId) {
if (blacklist.blockService == blockServiceId || blacklist.failureDomain == failureDomain) {
return true;
}
}
@@ -1416,7 +1418,8 @@ struct ShardDBImpl {
return EggsError::NO_ERROR;
}
EggsError _prepareAddSpanInitiate(const rocksdb::ReadOptions& options, EggsTime time, const AddSpanInitiateReq& req, InodeId reference, AddSpanInitiateEntry& entry, bool withReference) {
EggsError _prepareAddSpanInitiate(const rocksdb::ReadOptions& options, EggsTime time, const AddSpanAtLocationInitiateReq& request, InodeId reference, AddSpanAtLocationInitiateEntry& entry) {
auto& req = request.req.req;
if (req.fileId.type() != InodeType::FILE && req.fileId.type() != InodeType::SYMLINK) {
return EggsError::TYPE_IS_DIRECTORY;
}
@@ -1445,7 +1448,8 @@ struct ShardDBImpl {
}
// start filling in entry
entry.withReference = withReference;
entry.locationId = request.locationId;
entry.withReference = false;
entry.fileId = req.fileId;
entry.byteOffset = req.byteOffset;
entry.storageClass = req.storageClass;
@@ -1455,6 +1459,11 @@ struct ShardDBImpl {
entry.crc = req.crc;
entry.stripes = req.stripes;
//TODO hack for failover to flash in NOK
if (entry.locationId == 1 && entry.storageClass == HDD_STORAGE) {
entry.storageClass = FLASH_STORAGE;
}
// fill stripe CRCs
for (int s = 0; s < req.stripes; s++) {
uint32_t stripeCrc = 0;
@@ -1476,20 +1485,23 @@ struct ShardDBImpl {
LOG_DEBUG(_env, "Starting out with %s current block services", candidateBlockServices.size());
std::vector<BlacklistEntry> blacklist{req.blacklist.els};
{
for (BlockServiceId id: inMemoryBlockServicesData.currentBlockServices) {
const auto& cache = inMemoryBlockServicesData.blockServices.at(id.u64);
if (cache.storageClass != entry.storageClass) {
LOG_DEBUG(_env, "Skipping %s because of different storage class (%s != %s)", id, (int)cache.storageClass, (int)entry.storageClass);
for (BlockServiceInfoShort bs: inMemoryBlockServicesData.currentBlockServices) {
if (bs.locationId != entry.locationId) {
LOG_DEBUG(_env, "Skipping %s because of location mismatch(%s != %s)", bs.id, (int)bs.locationId, (int)entry.locationId);
continue;
}
if (_blockServiceMatchesBlacklist(blacklist, cache.failureDomain, id, cache)) {
LOG_DEBUG(_env, "Skipping %s because it matches blacklist", id);
if (bs.storageClass != entry.storageClass) {
LOG_DEBUG(_env, "Skipping %s because of different storage class (%s != %s)", bs.id, (int)bs.storageClass, (int)entry.storageClass);
continue;
}
candidateBlockServices.emplace_back(id);
if (_blockServiceMatchesBlacklist(blacklist, bs.failureDomain, bs.id)) {
LOG_DEBUG(_env, "Skipping %s because it matches blacklist", bs.id);
continue;
}
candidateBlockServices.emplace_back(bs.id);
BlacklistEntry newBlacklistEntry;
newBlacklistEntry.failureDomain.name.data = cache.failureDomain;
newBlacklistEntry.blockService = id;
newBlacklistEntry.failureDomain = bs.failureDomain;
newBlacklistEntry.blockService = bs.id;
blacklist.emplace_back(std::move(newBlacklistEntry));
}
}
@@ -1683,6 +1695,22 @@ struct ShardDBImpl {
return EggsError::NO_ERROR;
}
EggsError _prepareAddSpanLocation(EggsTime time, const AddSpanLocationReq& req, AddSpanLocationEntry& entry) {
if (req.fileId1.type() == InodeType::DIRECTORY || req.fileId2.type() == InodeType::DIRECTORY) {
return EggsError::TYPE_IS_DIRECTORY;
}
if (req.fileId1.shard() != _shid || req.fileId2.shard() != _shid) {
return EggsError::BAD_SHARD;
}
ALWAYS_ASSERT(req.fileId1 != req.fileId2);
entry.fileId1 = req.fileId1;
entry.byteOffset1 = req.byteOffset1;
entry.blocks1 = req.blocks1;
entry.fileId2 = req.fileId2;
entry.byteOffset2 = req.byteOffset2;
return EggsError::NO_ERROR;
}
EggsError _prepareMoveSpan(EggsTime time, const MoveSpanReq& req, MoveSpanEntry& entry) {
if (req.fileId1.type() == InodeType::DIRECTORY || req.fileId2.type() == InodeType::DIRECTORY) {
return EggsError::TYPE_IS_DIRECTORY;
@@ -1793,12 +1821,24 @@ struct ShardDBImpl {
err = _prepareAddInlineSpan(time, req.getAddInlineSpan(), logEntryBody.setAddInlineSpan());
break;
case ShardMessageKind::ADD_SPAN_INITIATE: {
const auto& addSpanReq = req.getAddSpanInitiate();
err = _prepareAddSpanInitiate(options, time, addSpanReq, addSpanReq.fileId, logEntryBody.setAddSpanInitiate(), false);
AddSpanAtLocationInitiateReq spanInitiateAtLocationReq;
spanInitiateAtLocationReq.locationId = DEFAULT_LOCATION;
spanInitiateAtLocationReq.req.reference = NULL_INODE_ID;
spanInitiateAtLocationReq.req.req = req.getAddSpanInitiate();
err = _prepareAddSpanInitiate(options, time, spanInitiateAtLocationReq, spanInitiateAtLocationReq.req.req.fileId, logEntryBody.setAddSpanAtLocationInitiate());
break; }
case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE: {
const auto& addSpanReq = req.getAddSpanInitiateWithReference();
err = _prepareAddSpanInitiate(options, time, addSpanReq.req, addSpanReq.reference, logEntryBody.setAddSpanInitiate(), true);
AddSpanAtLocationInitiateReq spanInitiateAtLocationReq;
spanInitiateAtLocationReq.locationId = DEFAULT_LOCATION;
spanInitiateAtLocationReq.req = req.getAddSpanInitiateWithReference();
err = _prepareAddSpanInitiate(options, time, spanInitiateAtLocationReq, spanInitiateAtLocationReq.req.reference, logEntryBody.setAddSpanAtLocationInitiate());
break; }
case ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE: {
auto reference = req.getAddSpanAtLocationInitiate().req.reference;
if (reference == NULL_INODE_ID) {
reference = req.getAddSpanAtLocationInitiate().req.req.fileId;
}
err = _prepareAddSpanInitiate(options, time, req.getAddSpanAtLocationInitiate(), reference, logEntryBody.setAddSpanAtLocationInitiate());
break; }
case ShardMessageKind::ADD_SPAN_CERTIFY:
err = _prepareAddSpanCertify(time, req.getAddSpanCertify(), logEntryBody.setAddSpanCertify());
@@ -1827,6 +1867,9 @@ struct ShardDBImpl {
case ShardMessageKind::SWAP_SPANS:
err = _prepareSwapSpans(time, req.getSwapSpans(), logEntryBody.setSwapSpans());
break;
case ShardMessageKind::ADD_SPAN_LOCATION:
err = _prepareAddSpanLocation(time, req.getAddSpanLocation(), logEntryBody.setAddSpanLocation());
break;
default:
throw EGGS_EXCEPTION("bad write shard message kind %s", req.kind());
}
@@ -3147,7 +3190,7 @@ struct ShardDBImpl {
std::string sourceFileValue;
ExternalValue<TransientFileBody> sourceFile;
{
EggsError err = _getTransientFile({}, time, false, entry.fileId2, sourceFileValue, sourceFile);
EggsError err = _initiateTransientFileModification(time, false, batch, entry.fileId1, sourceFileValue, sourceFile);
if (err != EggsError::NO_ERROR) {
return err;
}
@@ -3222,13 +3265,20 @@ struct ShardDBImpl {
_addBlockServicesToFiles(batch, block.blockService(), entry.fileId1, -1);
}
sourceFile().setFileSize(sourceFile().fileSize() - sourceSpan().spanSize());
{
auto k = InodeIdKey::Static(entry.fileId1);
ROCKS_DB_CHECKED(batch.Put(_transientCf, k.toSlice(), sourceFile.toSlice()));
}
OwnedValue<SpanBody> newDestinationSpan(destinationSpan(), blocksSource);
// now persist new state in destination
ROCKS_DB_CHECKED(batch.Put(_spansCf, destinationSpanKey.toSlice(), destinationSpan.toSlice()));
ROCKS_DB_CHECKED(batch.Put(_spansCf, destinationSpanKey.toSlice(), newDestinationSpan.toSlice()));
// and delete span at source
ROCKS_DB_CHECKED(batch.Delete(_spansCf, sourceSpanKey.toSlice()));
// change size and dirtiness
sourceFile().setFileSize(sourceFile().fileSize() - sourceSpan().spanSize());
return EggsError::NO_ERROR;
}
@@ -3823,6 +3873,7 @@ struct ShardDBImpl {
break; }
case ShardLogEntryKind::ADD_SPAN_AT_LOCATION_INITIATE: {
err = _applyAddSpanInitiate(time, batch, logEntryBody.getAddSpanAtLocationInitiate(), resp.setAddSpanAtLocationInitiate());
break;
}
case ShardLogEntryKind::ADD_SPAN_CERTIFY:
err = _applyAddSpanCertify(time, batch, logEntryBody.getAddSpanCertify(), resp.setAddSpanCertify());
+2 -1
View File
@@ -338,7 +338,7 @@ struct SpanBody {
static size_t calcSize(const SpanBody& existingSpan, const BlocksBodyWrapper& newLocationBlocks) {
ALWAYS_ASSERT(!existingSpan.isInlineStorage());
size_t size = MIN_SIZE;
for (uint8_t locIdx = 0; locIdx <= existingSpan.locationCount(); ++locIdx) {
for (uint8_t locIdx = 0; locIdx < existingSpan.locationCount(); ++locIdx) {
auto blocks = existingSpan.blocksBodyReadOnly(locIdx);
ALWAYS_ASSERT(blocks.location() != newLocationBlocks.location());
size += SpanBlocksBody::calcSize(
@@ -372,6 +372,7 @@ struct SpanBody {
SpanBlocksBodyV0 blocks(existingSpan._data + MIN_SIZE);
SpanBlocksBody blocksNew = blocksBody(0);
blocksNew.afterAlloc(LocationBlocksInfo(DEFAULT_LOCATION,existingSpan._storageClassOrLocationCount(), blocks.parity(), blocks.stripes()));
blocksNew.setCellSize(existingSpan.blocksBodyReadOnly(0).cellSize());
// now copy the block and crc info. layout is the same
memcpy(blocksNew._data + SpanBlocksBody::MIN_SIZE, blocks._data + SpanBlocksBodyV0::MIN_SIZE, blocks.size() - SpanBlocksBodyV0::MIN_SIZE);
} else {
+2
View File
@@ -115,6 +115,8 @@ func readShuckleResponse(
resp = &msgs.RegisterBlockServicesResp{}
case msgs.SHARDS_AT_LOCATION:
resp = &msgs.ShardsAtLocationResp{}
case msgs.SHARD_BLOCK_SERVICES:
resp = &msgs.ShardBlockServicesResp{}
default:
return nil, fmt.Errorf("bad shuckle response kind %v", kind)
}
+1 -3
View File
@@ -23,9 +23,7 @@ func WaitForBlockServices(ll *lib.Logger, shuckleAddress string, expectedBlockSe
ll.Debug("%v", err)
goto KeepChecking
}
if len(bss) > expectedBlockServices {
panic(fmt.Errorf("got more block services than expected (%v > %v)", len(bss), expectedBlockServices))
}
if waitCurrentServicesCalcuation {
resp, err = ShuckleRequest(ll, nil, shuckleAddress, &msgs.ShardBlockServicesDEPRECATEDReq{0})
if err != nil || len(resp.(*msgs.ShardBlockServicesDEPRECATEDResp).BlockServices) == 0 {
+46 -14
View File
@@ -33,7 +33,7 @@ func main() {
profile := flag.Bool("profile", false, "Whether to run code (both Go and C++) with profiling.")
shuckleBincodePort := flag.Uint("shuckle-bincode-port", 10001, "")
shuckleHttpPort := flag.Uint("shuckle-http-port", 10000, "")
startingPort := flag.Uint("start-port", 10002, "The services will be assigned port in this order, CDC, shard_000, ..., shard_255, bs_0, ..., bs_n. If 0, ports will be chosen randomly.")
startingPort := flag.Uint("start-port", 10003, "The services will be assigned port in this order, CDC, shard_000, ..., shard_255, bs_0, ..., bs_n. If 0, ports will be chosen randomly.")
repoDir := flag.String("repo-dir", "", "Used to build C++/Go binaries. If not provided, the path will be derived form the filename at build time (so will only work locally).")
binariesDir := flag.String("binaries-dir", "", "If provided, nothing will be built, instead it'll be assumed that the binaries will be in the specified directory.")
xmon := flag.String("xmon", "", "")
@@ -111,6 +111,7 @@ func main() {
ShuckleExe: path.Join(*binariesDir, "eggsshuckle"),
BlocksExe: path.Join(*binariesDir, "eggsblocks"),
FuseExe: path.Join(*binariesDir, "eggsfuse"),
ShuckleProxyExe: path.Join(*binariesDir, "eggsshuckleproxy"),
}
} else {
fmt.Printf("building shard/cdc/blockservice/shuckle\n")
@@ -134,9 +135,12 @@ func main() {
Dir: path.Join(*dataDir, "shuckle"),
Xmon: *xmon,
ScriptsJs: *shuckleScriptsJs,
Addr1: "127.0.0.1:10001",
Addr1: shuckleAddress,
})
shuckleProxyPort := *shuckleBincodePort + 1
shuckleProxyAddress := fmt.Sprintf("127.0.0.1:%v", shuckleProxyPort)
numLocations := 1
if *multiLocation {
// Waiting for shuckle
@@ -152,6 +156,17 @@ func main() {
panic(fmt.Errorf("failed to create location %v", err))
}
}
procs.StartShuckleProxy(
log, &managedprocess.ShuckleProxyOpts{
Exe: goExes.ShuckleProxyExe,
LogLevel: level,
Dir: path.Join(*dataDir, "shuckleproxy"),
Xmon: *xmon,
Addr1: shuckleProxyAddress,
ShuckleAddress: shuckleAddress,
Location: 1,
},
)
numLocations = 2
}
replicaCount := 5
@@ -169,6 +184,11 @@ func main() {
}
}
for loc := uint(0); loc < uint(numLocations); loc++ {
shuckleAddressToUse := shuckleAddress
if loc == 1{
shuckleAddressToUse = shuckleProxyAddress
}
for i := uint(0); i < *failureDomains; i++ {
dirName := fmt.Sprintf("bs_%d", i)
if loc > 0 {
@@ -181,7 +201,7 @@ func main() {
FailureDomain: fmt.Sprintf("%d_%d", i, loc),
Location: msgs.Location(loc),
LogLevel: level,
ShuckleAddress: fmt.Sprintf("127.0.0.1:%d", *shuckleBincodePort),
ShuckleAddress: shuckleAddressToUse,
Profile: *profile,
Xmon: *xmon,
ReserverdStorage: 10 << 30, // 10GB
@@ -238,6 +258,10 @@ func main() {
// Start shards
for loc := 0; loc < numLocations; loc++ {
shuckleAddressToUse := shuckleAddress
if loc == 1{
shuckleAddressToUse = shuckleProxyAddress
}
for i := 0; i < 256; i++ {
for r := uint8(0); r < uint8(replicaCount); r++ {
shrid := msgs.MakeShardReplicaId(msgs.ShardId(i), msgs.ReplicaId(r))
@@ -251,7 +275,7 @@ func main() {
Dir: path.Join(*dataDir, dirName),
LogLevel: level,
Valgrind: *buildType == "valgrind",
ShuckleAddress: shuckleAddress,
ShuckleAddress: shuckleAddressToUse,
Perf: *profile,
Xmon: *xmon,
Location: msgs.Location(loc),
@@ -278,17 +302,25 @@ func main() {
client.WaitForClient(log, shuckleAddress, waitShuckleFor)
if !*noFuse {
fuseMountPoint := procs.StartFuse(log, &managedprocess.FuseOpts{
Exe: goExes.FuseExe,
Path: path.Join(*dataDir, "fuse"),
LogLevel: level,
Wait: true,
ShuckleAddress: shuckleAddress,
Profile: *profile,
UseRandomFetchApi: *useRandomFetchApi,
})
for loc :=0; loc < numLocations; loc++ {
shuckleAddressToUse := shuckleAddress
fuseDir := "fuse"
if loc == 1 {
shuckleAddressToUse = shuckleProxyAddress
fuseDir = "fuse1"
}
fuseMountPoint := procs.StartFuse(log, &managedprocess.FuseOpts{
Exe: goExes.FuseExe,
Path: path.Join(*dataDir, fuseDir),
LogLevel: level,
Wait: true,
ShuckleAddress: shuckleAddressToUse,
Profile: *profile,
UseRandomFetchApi: *useRandomFetchApi,
})
fmt.Printf("operational, mounted at %v\n", fuseMountPoint)
fmt.Printf("operational, mounted at %v\n", fuseMountPoint)
}
} else {
fmt.Printf("operational\n")
}
+43
View File
@@ -438,11 +438,53 @@ func (procs *ManagedProcesses) StartShuckle(ll *lib.Logger, opts *ShuckleOpts) {
})
}
type ShuckleProxyOpts struct {
Exe string
Dir string
LogLevel lib.LogLevel
Xmon string
Addr1 string
Addr2 string
ShuckleAddress string
Location msgs.Location
}
func (procs *ManagedProcesses) StartShuckleProxy(ll *lib.Logger, opts *ShuckleProxyOpts) {
createDataDir(opts.Dir)
args := []string{
"-log-file", path.Join(opts.Dir, "log"),
"-addr", opts.Addr1,
"-shuckle-address", opts.ShuckleAddress,
"-location", fmt.Sprintf("%d",opts.Location),
}
if opts.LogLevel == lib.DEBUG {
args = append(args, "-verbose")
}
if opts.LogLevel == lib.TRACE {
args = append(args, "-trace")
}
if opts.Xmon != "" {
args = append(args, "-xmon", opts.Xmon)
}
if opts.Addr2 != "" {
args = append(args, "-addr", opts.Addr2)
}
procs.Start(ll, &ManagedProcessArgs{
Name: "shuckleproxy",
Exe: opts.Exe,
Args: args,
StdoutFile: path.Join(opts.Dir, "stdout"),
StderrFile: path.Join(opts.Dir, "stderr"),
TerminateOnExit: true,
})
}
type GoExes struct {
ShuckleExe string
BlocksExe string
FuseExe string
ShuckleBeaconExe string
ShuckleProxyExe string
}
func BuildGoExes(ll *lib.Logger, repoDir string, race bool) *GoExes {
@@ -463,6 +505,7 @@ func BuildGoExes(ll *lib.Logger, repoDir string, race bool) *GoExes {
BlocksExe: path.Join(goDir(repoDir), "eggsblocks", "eggsblocks"),
FuseExe: path.Join(goDir(repoDir), "eggsfuse", "eggsfuse"),
ShuckleBeaconExe: path.Join(goDir(repoDir), "eggsshucklebeacon", "eggsshucklebeacon"),
ShuckleProxyExe: path.Join(goDir(repoDir), "eggsshuckleproxy", "eggsshuckleproxy"),
}
}