From e1b8de02dc0034eb523834728beb0d8564e4bfbd Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Wed, 15 Feb 2023 14:03:53 +0000 Subject: [PATCH] More assorted improvements --- cpp/cdc/CDC.cpp | 5 +- cpp/cdc/CDCDB.cpp | 244 ++++++++++++++++++------ cpp/cdc/CDCDB.hpp | 14 +- cpp/core/MsgsGen.cpp | 350 ++++++++++++++++++---------------- cpp/core/MsgsGen.hpp | 138 +++++++------- cpp/core/Shuckle.cpp | 6 +- cpp/core/Shuckle.hpp | 11 +- cpp/shard/Shard.cpp | 1 - cpp/shard/ShardDB.cpp | 3 +- cpp/shard/ShardDBData.hpp | 1 - go/bincodegen/bincodegen.go | 7 +- go/eggs/client.go | 66 +++++++ go/eggs/gc.go | 27 ++- go/eggsfuse/eggsfuse.go | 59 +----- go/eggsshuckle/base.html | 2 +- go/eggsshuckle/directory.html | 12 +- go/eggsshuckle/eggsshuckle.go | 65 ++++--- go/eggsshuckle/file.html | 6 +- go/eggsshuckle/index.html | 4 + go/gcdaemon/gcdaemon.go | 93 --------- go/msgs/msgs.go | 7 +- go/msgs/msgs_bincode.go | 18 ++ 22 files changed, 635 insertions(+), 504 deletions(-) delete mode 100644 go/gcdaemon/gcdaemon.go diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index d7770cf4..32c000a8 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -19,7 +19,6 @@ #include "Env.hpp" #include "Exception.hpp" #include "Msgs.hpp" -#include "MsgsGen.hpp" #include "Time.hpp" #include "Undertaker.hpp" #include "CDCDB.hpp" @@ -585,7 +584,9 @@ public: continue; } LOG_INFO(_env, "Registering ourselves (CDC, port %s) with shuckle", port); - std::string err = registerCDC(_shuckleHost, _shucklePort, 100_ms, _ownIp, port); + CDCStatus status; + _shared.db.status(status); + std::string err = registerCDC(_shuckleHost, _shucklePort, 100_ms, _ownIp, port, status); if (!err.empty()) { RAISE_ALERT(_env, "Couldn't register ourselves with shuckle: %s", err); EggsTime successfulIterationAt = 0; diff --git a/cpp/cdc/CDCDB.cpp b/cpp/cdc/CDCDB.cpp index b13e74fc..e21e41aa 100644 --- a/cpp/cdc/CDCDB.cpp +++ b/cpp/cdc/CDCDB.cpp @@ -12,7 +12,6 @@ #include "Env.hpp" #include "Exception.hpp" #include "Msgs.hpp" -#include "MsgsGen.hpp" #include "RocksDBUtils.hpp" #include "ShardDB.hpp" @@ -204,15 +203,28 @@ struct MakeDirectoryStateMachine { {} void resume(EggsError err, const ShardRespContainer* resp) { - ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START)); - switch (env.txnStep) { - case TXN_START: start(); break; - case MAKE_DIRECTORY_LOOKUP: afterLookup(err, resp); break; - case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(err, resp); break; - case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(err, resp); break; - case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break; - case MAKE_DIRECTORY_ROLLBACK: afterRollback(err, resp); break; - default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + if (env.txnStep == TXN_START) { + start(); + return; + } + if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response + switch (env.txnStep) { + case MAKE_DIRECTORY_LOOKUP: lookup(); break; + case MAKE_DIRECTORY_CREATE_DIR: createDirectoryInode(); break; + case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: createLockedEdge(); break; + case MAKE_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break; + case MAKE_DIRECTORY_ROLLBACK: rollback(); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } + } else { + switch (env.txnStep) { + case MAKE_DIRECTORY_LOOKUP: afterLookup(err, resp); break; + case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(err, resp); break; + case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(err, resp); break; + case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break; + case MAKE_DIRECTORY_ROLLBACK: afterRollback(err, resp); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } } } @@ -351,11 +363,20 @@ struct HardUnlinkDirectoryStateMachine { {} void resume(EggsError err, const ShardRespContainer* resp) { - ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START)); - switch (env.txnStep) { - case TXN_START: removeInode(); break; - case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(err, resp); break; - default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + if (env.txnStep == TXN_START) { + removeInode(); + return; + } + if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response + switch (env.txnStep) { + case HARD_UNLINK_DIRECTORY_REMOVE_INODE: removeInode(); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } + } else { + switch (env.txnStep) { + case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(err, resp); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } } } @@ -404,15 +425,28 @@ struct RenameFileStateMachine { {} void resume(EggsError err, const ShardRespContainer* resp) { - ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START)); - switch (env.txnStep) { - case TXN_START: start(); break; - case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break; - case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(err, resp); break; - case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break; - case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break; - case RENAME_FILE_ROLLBACK: afterRollback(err, resp); break; - default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + if (env.txnStep == TXN_START) { + start(); + return; + } + if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response + switch (env.txnStep) { + case RENAME_FILE_LOCK_OLD_EDGE: lockOldEdge(); break; + case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: createNewLockedEdge(); break; + case RENAME_FILE_UNLOCK_NEW_EDGE: unlockNewEdge(); break; + case RENAME_FILE_UNLOCK_OLD_EDGE: unlockOldEdge(); break; + case RENAME_FILE_ROLLBACK: rollback(); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } + } else { + switch (env.txnStep) { + case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break; + case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(err, resp); break; + case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break; + case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break; + case RENAME_FILE_ROLLBACK: afterRollback(err, resp); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } } } @@ -565,15 +599,29 @@ struct SoftUnlinkDirectoryStateMachine { {} void resume(EggsError err, const ShardRespContainer* resp) { - ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START)); - switch (env.txnStep) { - case TXN_START: start(); break; - case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(err, resp); break; - case SOFT_UNLINK_DIRECTORY_STAT: afterStat(err, resp); break; - case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(err, resp); break; - case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break; - case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(err, resp); break; - default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + if (env.txnStep == TXN_START) { + start(); + return; + } + if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response + switch (env.txnStep) { + case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: lockEdge(); break; + case SOFT_UNLINK_DIRECTORY_STAT: stat(); break; + // We don't store the directory info, so we need to restart the stating when resuming + case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: stat(); break; + case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break; + case SOFT_UNLINK_DIRECTORY_ROLLBACK: rollback(); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } + } else { + switch (env.txnStep) { + case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(err, resp); break; + case SOFT_UNLINK_DIRECTORY_STAT: afterStat(err, resp); break; + case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(err, resp); break; + case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(err, resp); break; + case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(err, resp); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } } } @@ -722,16 +770,30 @@ struct RenameDirectoryStateMachine { {} void resume(EggsError err, const ShardRespContainer* resp) { - ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START)); - switch (env.txnStep) { - case TXN_START: start(); break; - case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break; - case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(err, resp); break; - case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break; - case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break; - case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(err, resp); break; - case RENAME_DIRECTORY_ROLLBACK: afterRollback(err, resp); break; - default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + if (env.txnStep == TXN_START) { + start(); + return; + } + if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response + switch (env.txnStep) { + case RENAME_DIRECTORY_LOCK_OLD_EDGE: lockOldEdge(); break; + case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: createLockedNewEdge(); break; + case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: unlockNewEdge(); break; + case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: unlockOldEdge(); break; + case RENAME_DIRECTORY_SET_OWNER: setOwner(); break; + case RENAME_DIRECTORY_ROLLBACK: rollback(); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } + } else { + switch (env.txnStep) { + case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(err, resp); break; + case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(err, resp); break; + case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(err, resp); break; + case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(err, resp); break; + case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(err, resp); break; + case RENAME_DIRECTORY_ROLLBACK: afterRollback(err, resp); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } } } @@ -911,6 +973,12 @@ enum CrossShardHardUnlinkFileStep : uint8_t { CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT = 2, }; +// Steps: +// +// 1. Remove owning edge in one shard +// 2. Make file transient in other shard +// +// Step 2 cannot fail. struct CrossShardHardUnlinkFileStateMachine { StateMachineEnv& env; const CrossShardHardUnlinkFileReq& req; @@ -921,12 +989,22 @@ struct CrossShardHardUnlinkFileStateMachine { {} void resume(EggsError err, const ShardRespContainer* resp) { - ALWAYS_ASSERT((err == NO_ERROR && resp == nullptr) == (env.txnStep == TXN_START)); - switch (env.txnStep) { - case TXN_START: start(); break; - case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(err, resp); break; - case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(err, resp); break; - default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + if (env.txnStep == TXN_START) { + start(); + return; + } + if (unlikely(err == NO_ERROR && resp == nullptr)) { // we're resuming with no response + switch (env.txnStep) { + case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: removeEdge(); break; + case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: makeTransient(); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } + } else { + switch (env.txnStep) { + case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(err, resp); break; + case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(err, resp); break; + default: throw EGGS_EXCEPTION("bad step %s", env.txnStep); + } } } @@ -973,14 +1051,6 @@ struct CrossShardHardUnlinkFileStateMachine { } }; -// Steps: -// -// 1. Remove owning edge in one shard -// 2. Make file transient in other shard -// -// Step 2 cannot fail. - - // Wrapper types to pack/unpack with kind struct PackCDCReq { const CDCReqContainer& req; @@ -1260,18 +1330,18 @@ struct CDCDBImpl { // Starts executing the next transaction in line, if possible. If it managed // to start something, it immediately advances it as well (no point delaying - // that). - void _startExecuting(EggsTime time, rocksdb::Transaction& dbTxn, CDCStep& step) { + // that). Returns the txn id that was started, if any. + uint64_t _startExecuting(EggsTime time, rocksdb::Transaction& dbTxn, CDCStep& step) { uint64_t executingTxn = _executingTxn(dbTxn); if (executingTxn != 0) { LOG_DEBUG(_env, "another transaction %s is already executing, can't start", executingTxn); - return; + return 0; } uint64_t txnToExecute = _reqQueuePeek(dbTxn, _cdcReq); if (txnToExecute == 0) { LOG_DEBUG(_env, "no transactions in queue found, can't start"); - return; + return 0; } _setExecutingTxn(dbTxn, txnToExecute); @@ -1279,6 +1349,7 @@ struct CDCDBImpl { StaticValue txnState; txnState().start(_cdcReq.kind()); _advance(time, dbTxn, txnToExecute, _cdcReq, NO_ERROR, nullptr, txnState, step); + return txnToExecute; } uint64_t processCDCReq( @@ -1375,10 +1446,55 @@ struct CDCDBImpl { step.clear(); _advanceLastAppliedLogEntry(*dbTxn, logIndex); - _startExecuting(time, *dbTxn, step); + uint64_t txnId = _startExecuting(time, *dbTxn, step); + if (txnId == 0) { + // no txn could be started, see if one is executing already to fill in the `step` + txnId = _executingTxn(*dbTxn); + if (txnId != 0) { + LOG_DEBUG(_env, "transaction %s is already executing, will resume it", txnId); + // Get the req + { + auto k = U64Key::Static(txnId); + std::string reqV; + ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV)); + UnpackCDCReq ureq(_cdcReq); + bincodeFromRocksValue(reqV, ureq); + } + // Get the state + std::string txnStateV; + ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV)); + ExternalValue txnState(txnStateV); + // Advance + _advance(time, *dbTxn, txnId, _cdcReq, NO_ERROR, nullptr, txnState, step); + } + } dbTxn->Commit(); } + + void status(CDCStatus& resp) { + memset(&resp, 0, sizeof(resp)); + std::unique_ptr dbTxn(_db->BeginTransaction({})); + uint64_t txnId = _executingTxn(*dbTxn); + if (txnId != 0) { + // Get the req kind + { + auto k = U64Key::Static(txnId); + std::string reqV; + ROCKS_DB_CHECKED(dbTxn->Get({}, _reqQueueCf, k.toSlice(), &reqV)); + BincodeBuf bbuf(reqV); + resp.executingTxnKind = (CDCMessageKind)bbuf.unpackScalar(); + } + // Get the step + { + std::string txnStateV; + ROCKS_DB_CHECKED(dbTxn->Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV)); + ExternalValue txnState(txnStateV); + resp.executingTxnStep = txnState().step(); + } + } + resp.queuedTxns = _lastTxnInQueue(*dbTxn) - _firstTxnInQueue(*dbTxn); + } }; CDCDB::CDCDB(Logger& logger, const std::string& path) { @@ -1408,3 +1524,7 @@ void CDCDB::startNextTransaction(bool sync, EggsTime time, uint64_t logIndex, CD uint64_t CDCDB::lastAppliedLogEntry() { return ((CDCDBImpl*)_impl)->_lastAppliedLogEntry(); } + +void CDCDB::status(CDCStatus& resp) { + return ((CDCDBImpl*)_impl)->status(resp); +} \ No newline at end of file diff --git a/cpp/cdc/CDCDB.hpp b/cpp/cdc/CDCDB.hpp index dc70b155..137283c3 100644 --- a/cpp/cdc/CDCDB.hpp +++ b/cpp/cdc/CDCDB.hpp @@ -1,12 +1,9 @@ #pragma once -#include -#include - #include "Bincode.hpp" #include "Msgs.hpp" #include "Env.hpp" -#include "MsgsGen.hpp" +#include "Shuckle.hpp" struct CDCShardReq { ShardId shid; @@ -86,7 +83,8 @@ public: // Advances the CDC state using the given shard response. // - // This function crashes hard if the caller passes it a response it's not expecting. + // This function crashes hard if the caller passes it a response it's not expecting. So the caller + // should track responses and make sure only the correct one is passed in. void processShardResp( bool sync, // Whether to persist synchronously. Unneeded if log entries are persisted already. EggsTime time, @@ -97,7 +95,9 @@ public: CDCStep& step ); - // Starts the next transaction in line (if any). + // Does what it can to advance the state of the system, by starting the next + // transaction in line (if any). In any case, it returns what there is to do next + // in `step`. // // It's fine to call this function even if there's nothing to do -- and in fact // you should do that when starting up the CDC, to make sure to finish @@ -111,4 +111,6 @@ public: // The index of the last log entry persisted to the DB uint64_t lastAppliedLogEntry(); + + void status(CDCStatus& resp); }; \ No newline at end of file diff --git a/cpp/core/MsgsGen.cpp b/cpp/core/MsgsGen.cpp index 9e274066..573d4de6 100644 --- a/cpp/core/MsgsGen.cpp +++ b/cpp/core/MsgsGen.cpp @@ -161,6 +161,174 @@ std::ostream& operator<<(std::ostream& out, EggsError err) { return out; } +std::ostream& operator<<(std::ostream& out, ShardMessageKind kind) { + switch (kind) { + case ShardMessageKind::LOOKUP: + out << "LOOKUP"; + break; + case ShardMessageKind::STAT_FILE: + out << "STAT_FILE"; + break; + case ShardMessageKind::STAT_TRANSIENT_FILE: + out << "STAT_TRANSIENT_FILE"; + break; + case ShardMessageKind::STAT_DIRECTORY: + out << "STAT_DIRECTORY"; + break; + case ShardMessageKind::READ_DIR: + out << "READ_DIR"; + break; + case ShardMessageKind::CONSTRUCT_FILE: + out << "CONSTRUCT_FILE"; + break; + case ShardMessageKind::ADD_SPAN_INITIATE: + out << "ADD_SPAN_INITIATE"; + break; + case ShardMessageKind::ADD_SPAN_CERTIFY: + out << "ADD_SPAN_CERTIFY"; + break; + case ShardMessageKind::LINK_FILE: + out << "LINK_FILE"; + break; + case ShardMessageKind::SOFT_UNLINK_FILE: + out << "SOFT_UNLINK_FILE"; + break; + case ShardMessageKind::FILE_SPANS: + out << "FILE_SPANS"; + break; + case ShardMessageKind::SAME_DIRECTORY_RENAME: + out << "SAME_DIRECTORY_RENAME"; + break; + case ShardMessageKind::SET_DIRECTORY_INFO: + out << "SET_DIRECTORY_INFO"; + break; + case ShardMessageKind::SNAPSHOT_LOOKUP: + out << "SNAPSHOT_LOOKUP"; + break; + case ShardMessageKind::EXPIRE_TRANSIENT_FILE: + out << "EXPIRE_TRANSIENT_FILE"; + break; + case ShardMessageKind::VISIT_DIRECTORIES: + out << "VISIT_DIRECTORIES"; + break; + case ShardMessageKind::VISIT_FILES: + out << "VISIT_FILES"; + break; + case ShardMessageKind::VISIT_TRANSIENT_FILES: + out << "VISIT_TRANSIENT_FILES"; + break; + case ShardMessageKind::FULL_READ_DIR: + out << "FULL_READ_DIR"; + break; + case ShardMessageKind::REMOVE_NON_OWNED_EDGE: + out << "REMOVE_NON_OWNED_EDGE"; + break; + case ShardMessageKind::SAME_SHARD_HARD_FILE_UNLINK: + out << "SAME_SHARD_HARD_FILE_UNLINK"; + break; + case ShardMessageKind::REMOVE_SPAN_INITIATE: + out << "REMOVE_SPAN_INITIATE"; + break; + case ShardMessageKind::REMOVE_SPAN_CERTIFY: + out << "REMOVE_SPAN_CERTIFY"; + break; + case ShardMessageKind::SWAP_BLOCKS: + out << "SWAP_BLOCKS"; + break; + case ShardMessageKind::BLOCK_SERVICE_FILES: + out << "BLOCK_SERVICE_FILES"; + break; + case ShardMessageKind::REMOVE_INODE: + out << "REMOVE_INODE"; + break; + case ShardMessageKind::CREATE_DIRECTORY_INODE: + out << "CREATE_DIRECTORY_INODE"; + break; + case ShardMessageKind::SET_DIRECTORY_OWNER: + out << "SET_DIRECTORY_OWNER"; + break; + case ShardMessageKind::REMOVE_DIRECTORY_OWNER: + out << "REMOVE_DIRECTORY_OWNER"; + break; + case ShardMessageKind::CREATE_LOCKED_CURRENT_EDGE: + out << "CREATE_LOCKED_CURRENT_EDGE"; + break; + case ShardMessageKind::LOCK_CURRENT_EDGE: + out << "LOCK_CURRENT_EDGE"; + break; + case ShardMessageKind::UNLOCK_CURRENT_EDGE: + out << "UNLOCK_CURRENT_EDGE"; + break; + case ShardMessageKind::REMOVE_OWNED_SNAPSHOT_FILE_EDGE: + out << "REMOVE_OWNED_SNAPSHOT_FILE_EDGE"; + break; + case ShardMessageKind::MAKE_FILE_TRANSIENT: + out << "MAKE_FILE_TRANSIENT"; + break; + default: + out << "ShardMessageKind(" << ((int)kind) << ")"; + break; + } + return out; +} + +std::ostream& operator<<(std::ostream& out, CDCMessageKind kind) { + switch (kind) { + case CDCMessageKind::MAKE_DIRECTORY: + out << "MAKE_DIRECTORY"; + break; + case CDCMessageKind::RENAME_FILE: + out << "RENAME_FILE"; + break; + case CDCMessageKind::SOFT_UNLINK_DIRECTORY: + out << "SOFT_UNLINK_DIRECTORY"; + break; + case CDCMessageKind::RENAME_DIRECTORY: + out << "RENAME_DIRECTORY"; + break; + case CDCMessageKind::HARD_UNLINK_DIRECTORY: + out << "HARD_UNLINK_DIRECTORY"; + break; + case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE: + out << "CROSS_SHARD_HARD_UNLINK_FILE"; + break; + default: + out << "CDCMessageKind(" << ((int)kind) << ")"; + break; + } + return out; +} + +std::ostream& operator<<(std::ostream& out, ShuckleMessageKind kind) { + switch (kind) { + case ShuckleMessageKind::BLOCK_SERVICES_FOR_SHARD: + out << "BLOCK_SERVICES_FOR_SHARD"; + break; + case ShuckleMessageKind::REGISTER_BLOCK_SERVICES: + out << "REGISTER_BLOCK_SERVICES"; + break; + case ShuckleMessageKind::SHARDS: + out << "SHARDS"; + break; + case ShuckleMessageKind::REGISTER_SHARD: + out << "REGISTER_SHARD"; + break; + case ShuckleMessageKind::ALL_BLOCK_SERVICES: + out << "ALL_BLOCK_SERVICES"; + break; + case ShuckleMessageKind::REGISTER_CDC: + out << "REGISTER_CDC"; + break; + case ShuckleMessageKind::CDC: + out << "CDC"; + break; + default: + out << "ShuckleMessageKind(" << ((int)kind) << ")"; + break; + } + return out; +} + void TransientFile::pack(BincodeBuf& buf) const { id.pack(buf); buf.packFixedBytes<8>(cookie); @@ -2662,22 +2830,34 @@ std::ostream& operator<<(std::ostream& out, const AllBlockServicesResp& x) { void RegisterCdcReq::pack(BincodeBuf& buf) const { buf.packFixedBytes<4>(ip); buf.packScalar(port); + buf.packScalar(currentTransactionKind); + buf.packScalar(currentTransactionStep); + buf.packScalar(queuedTransactions); } void RegisterCdcReq::unpack(BincodeBuf& buf) { buf.unpackFixedBytes<4>(ip); port = buf.unpackScalar(); + currentTransactionKind = buf.unpackScalar(); + currentTransactionStep = buf.unpackScalar(); + queuedTransactions = buf.unpackScalar(); } void RegisterCdcReq::clear() { ip.clear(); port = uint16_t(0); + currentTransactionKind = CDCMessageKind(0); + currentTransactionStep = uint8_t(0); + queuedTransactions = uint64_t(0); } bool RegisterCdcReq::operator==(const RegisterCdcReq& rhs) const { if (ip != rhs.ip) { return false; }; if ((uint16_t)this->port != (uint16_t)rhs.port) { return false; }; + if ((CDCMessageKind)this->currentTransactionKind != (CDCMessageKind)rhs.currentTransactionKind) { return false; }; + if ((uint8_t)this->currentTransactionStep != (uint8_t)rhs.currentTransactionStep) { return false; }; + if ((uint64_t)this->queuedTransactions != (uint64_t)rhs.queuedTransactions) { return false; }; return true; } std::ostream& operator<<(std::ostream& out, const RegisterCdcReq& x) { - out << "RegisterCdcReq(" << "Ip=" << x.ip << ", " << "Port=" << x.port << ")"; + out << "RegisterCdcReq(" << "Ip=" << x.ip << ", " << "Port=" << x.port << ", " << "CurrentTransactionKind=" << x.currentTransactionKind << ", " << "CurrentTransactionStep=" << (int)x.currentTransactionStep << ", " << "QueuedTransactions=" << x.queuedTransactions << ")"; return out; } @@ -2735,117 +2915,6 @@ std::ostream& operator<<(std::ostream& out, const CdcResp& x) { return out; } -std::ostream& operator<<(std::ostream& out, ShardMessageKind kind) { - switch (kind) { - case ShardMessageKind::LOOKUP: - out << "LOOKUP"; - break; - case ShardMessageKind::STAT_FILE: - out << "STAT_FILE"; - break; - case ShardMessageKind::STAT_TRANSIENT_FILE: - out << "STAT_TRANSIENT_FILE"; - break; - case ShardMessageKind::STAT_DIRECTORY: - out << "STAT_DIRECTORY"; - break; - case ShardMessageKind::READ_DIR: - out << "READ_DIR"; - break; - case ShardMessageKind::CONSTRUCT_FILE: - out << "CONSTRUCT_FILE"; - break; - case ShardMessageKind::ADD_SPAN_INITIATE: - out << "ADD_SPAN_INITIATE"; - break; - case ShardMessageKind::ADD_SPAN_CERTIFY: - out << "ADD_SPAN_CERTIFY"; - break; - case ShardMessageKind::LINK_FILE: - out << "LINK_FILE"; - break; - case ShardMessageKind::SOFT_UNLINK_FILE: - out << "SOFT_UNLINK_FILE"; - break; - case ShardMessageKind::FILE_SPANS: - out << "FILE_SPANS"; - break; - case ShardMessageKind::SAME_DIRECTORY_RENAME: - out << "SAME_DIRECTORY_RENAME"; - break; - case ShardMessageKind::SET_DIRECTORY_INFO: - out << "SET_DIRECTORY_INFO"; - break; - case ShardMessageKind::SNAPSHOT_LOOKUP: - out << "SNAPSHOT_LOOKUP"; - break; - case ShardMessageKind::EXPIRE_TRANSIENT_FILE: - out << "EXPIRE_TRANSIENT_FILE"; - break; - case ShardMessageKind::VISIT_DIRECTORIES: - out << "VISIT_DIRECTORIES"; - break; - case ShardMessageKind::VISIT_FILES: - out << "VISIT_FILES"; - break; - case ShardMessageKind::VISIT_TRANSIENT_FILES: - out << "VISIT_TRANSIENT_FILES"; - break; - case ShardMessageKind::FULL_READ_DIR: - out << "FULL_READ_DIR"; - break; - case ShardMessageKind::REMOVE_NON_OWNED_EDGE: - out << "REMOVE_NON_OWNED_EDGE"; - break; - case ShardMessageKind::SAME_SHARD_HARD_FILE_UNLINK: - out << "SAME_SHARD_HARD_FILE_UNLINK"; - break; - case ShardMessageKind::REMOVE_SPAN_INITIATE: - out << "REMOVE_SPAN_INITIATE"; - break; - case ShardMessageKind::REMOVE_SPAN_CERTIFY: - out << "REMOVE_SPAN_CERTIFY"; - break; - case ShardMessageKind::SWAP_BLOCKS: - out << "SWAP_BLOCKS"; - break; - case ShardMessageKind::BLOCK_SERVICE_FILES: - out << "BLOCK_SERVICE_FILES"; - break; - case ShardMessageKind::REMOVE_INODE: - out << "REMOVE_INODE"; - break; - case ShardMessageKind::CREATE_DIRECTORY_INODE: - out << "CREATE_DIRECTORY_INODE"; - break; - case ShardMessageKind::SET_DIRECTORY_OWNER: - out << "SET_DIRECTORY_OWNER"; - break; - case ShardMessageKind::REMOVE_DIRECTORY_OWNER: - out << "REMOVE_DIRECTORY_OWNER"; - break; - case ShardMessageKind::CREATE_LOCKED_CURRENT_EDGE: - out << "CREATE_LOCKED_CURRENT_EDGE"; - break; - case ShardMessageKind::LOCK_CURRENT_EDGE: - out << "LOCK_CURRENT_EDGE"; - break; - case ShardMessageKind::UNLOCK_CURRENT_EDGE: - out << "UNLOCK_CURRENT_EDGE"; - break; - case ShardMessageKind::REMOVE_OWNED_SNAPSHOT_FILE_EDGE: - out << "REMOVE_OWNED_SNAPSHOT_FILE_EDGE"; - break; - case ShardMessageKind::MAKE_FILE_TRANSIENT: - out << "MAKE_FILE_TRANSIENT"; - break; - default: - out << "ShardMessageKind(" << ((int)kind) << ")"; - break; - } - return out; -} - const LookupReq& ShardReqContainer::getLookup() const { ALWAYS_ASSERT(_kind == ShardMessageKind::LOOKUP, "%s != %s", _kind, ShardMessageKind::LOOKUP); return std::get<0>(_data); @@ -4334,33 +4403,6 @@ std::ostream& operator<<(std::ostream& out, const ShardRespContainer& x) { return out; } -std::ostream& operator<<(std::ostream& out, CDCMessageKind kind) { - switch (kind) { - case CDCMessageKind::MAKE_DIRECTORY: - out << "MAKE_DIRECTORY"; - break; - case CDCMessageKind::RENAME_FILE: - out << "RENAME_FILE"; - break; - case CDCMessageKind::SOFT_UNLINK_DIRECTORY: - out << "SOFT_UNLINK_DIRECTORY"; - break; - case CDCMessageKind::RENAME_DIRECTORY: - out << "RENAME_DIRECTORY"; - break; - case CDCMessageKind::HARD_UNLINK_DIRECTORY: - out << "HARD_UNLINK_DIRECTORY"; - break; - case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE: - out << "CROSS_SHARD_HARD_UNLINK_FILE"; - break; - default: - out << "CDCMessageKind(" << ((int)kind) << ")"; - break; - } - return out; -} - const MakeDirectoryReq& CDCReqContainer::getMakeDirectory() const { ALWAYS_ASSERT(_kind == CDCMessageKind::MAKE_DIRECTORY, "%s != %s", _kind, CDCMessageKind::MAKE_DIRECTORY); return std::get<0>(_data); @@ -4673,36 +4715,6 @@ std::ostream& operator<<(std::ostream& out, const CDCRespContainer& x) { return out; } -std::ostream& operator<<(std::ostream& out, ShuckleMessageKind kind) { - switch (kind) { - case ShuckleMessageKind::BLOCK_SERVICES_FOR_SHARD: - out << "BLOCK_SERVICES_FOR_SHARD"; - break; - case ShuckleMessageKind::REGISTER_BLOCK_SERVICES: - out << "REGISTER_BLOCK_SERVICES"; - break; - case ShuckleMessageKind::SHARDS: - out << "SHARDS"; - break; - case ShuckleMessageKind::REGISTER_SHARD: - out << "REGISTER_SHARD"; - break; - case ShuckleMessageKind::ALL_BLOCK_SERVICES: - out << "ALL_BLOCK_SERVICES"; - break; - case ShuckleMessageKind::REGISTER_CDC: - out << "REGISTER_CDC"; - break; - case ShuckleMessageKind::CDC: - out << "CDC"; - break; - default: - out << "ShuckleMessageKind(" << ((int)kind) << ")"; - break; - } - return out; -} - const BlockServicesForShardReq& ShuckleReqContainer::getBlockServicesForShard() const { ALWAYS_ASSERT(_kind == ShuckleMessageKind::BLOCK_SERVICES_FOR_SHARD, "%s != %s", _kind, ShuckleMessageKind::BLOCK_SERVICES_FOR_SHARD); return std::get<0>(_data); diff --git a/cpp/core/MsgsGen.hpp b/cpp/core/MsgsGen.hpp index 91b1b975..b6d95bfe 100644 --- a/cpp/core/MsgsGen.hpp +++ b/cpp/core/MsgsGen.hpp @@ -58,6 +58,71 @@ enum class EggsError : uint16_t { std::ostream& operator<<(std::ostream& out, EggsError err); +enum class ShardMessageKind : uint8_t { + ERROR = 0, + LOOKUP = 1, + STAT_FILE = 2, + STAT_TRANSIENT_FILE = 10, + STAT_DIRECTORY = 8, + READ_DIR = 3, + CONSTRUCT_FILE = 4, + ADD_SPAN_INITIATE = 5, + ADD_SPAN_CERTIFY = 6, + LINK_FILE = 7, + SOFT_UNLINK_FILE = 12, + FILE_SPANS = 13, + SAME_DIRECTORY_RENAME = 14, + SET_DIRECTORY_INFO = 15, + SNAPSHOT_LOOKUP = 9, + EXPIRE_TRANSIENT_FILE = 11, + VISIT_DIRECTORIES = 21, + VISIT_FILES = 32, + VISIT_TRANSIENT_FILES = 22, + FULL_READ_DIR = 33, + REMOVE_NON_OWNED_EDGE = 23, + SAME_SHARD_HARD_FILE_UNLINK = 24, + REMOVE_SPAN_INITIATE = 25, + REMOVE_SPAN_CERTIFY = 26, + SWAP_BLOCKS = 34, + BLOCK_SERVICE_FILES = 35, + REMOVE_INODE = 36, + CREATE_DIRECTORY_INODE = 128, + SET_DIRECTORY_OWNER = 129, + REMOVE_DIRECTORY_OWNER = 137, + CREATE_LOCKED_CURRENT_EDGE = 130, + LOCK_CURRENT_EDGE = 131, + UNLOCK_CURRENT_EDGE = 132, + REMOVE_OWNED_SNAPSHOT_FILE_EDGE = 134, + MAKE_FILE_TRANSIENT = 135, +}; + +std::ostream& operator<<(std::ostream& out, ShardMessageKind kind); + +enum class CDCMessageKind : uint8_t { + ERROR = 0, + MAKE_DIRECTORY = 1, + RENAME_FILE = 2, + SOFT_UNLINK_DIRECTORY = 3, + RENAME_DIRECTORY = 4, + HARD_UNLINK_DIRECTORY = 5, + CROSS_SHARD_HARD_UNLINK_FILE = 6, +}; + +std::ostream& operator<<(std::ostream& out, CDCMessageKind kind); + +enum class ShuckleMessageKind : uint8_t { + ERROR = 0, + BLOCK_SERVICES_FOR_SHARD = 1, + REGISTER_BLOCK_SERVICES = 2, + SHARDS = 3, + REGISTER_SHARD = 4, + ALL_BLOCK_SERVICES = 5, + REGISTER_CDC = 6, + CDC = 7, +}; + +std::ostream& operator<<(std::ostream& out, ShuckleMessageKind kind); + struct TransientFile { InodeId id; BincodeFixedBytes<8> cookie; @@ -2400,14 +2465,20 @@ std::ostream& operator<<(std::ostream& out, const AllBlockServicesResp& x); struct RegisterCdcReq { BincodeFixedBytes<4> ip; uint16_t port; + CDCMessageKind currentTransactionKind; + uint8_t currentTransactionStep; + uint64_t queuedTransactions; - static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2; // ip + port + static constexpr uint16_t STATIC_SIZE = BincodeFixedBytes<4>::STATIC_SIZE + 2 + 1 + 1 + 8; // ip + port + currentTransactionKind + currentTransactionStep + queuedTransactions RegisterCdcReq() { clear(); } uint16_t packedSize() const { uint16_t _size = 0; _size += BincodeFixedBytes<4>::STATIC_SIZE; // ip _size += 2; // port + _size += 1; // currentTransactionKind + _size += 1; // currentTransactionStep + _size += 8; // queuedTransactions return _size; } void pack(BincodeBuf& buf) const; @@ -2475,46 +2546,6 @@ struct CdcResp { std::ostream& operator<<(std::ostream& out, const CdcResp& x); -enum class ShardMessageKind : uint8_t { - ERROR = 0, - LOOKUP = 1, - STAT_FILE = 2, - STAT_TRANSIENT_FILE = 10, - STAT_DIRECTORY = 8, - READ_DIR = 3, - CONSTRUCT_FILE = 4, - ADD_SPAN_INITIATE = 5, - ADD_SPAN_CERTIFY = 6, - LINK_FILE = 7, - SOFT_UNLINK_FILE = 12, - FILE_SPANS = 13, - SAME_DIRECTORY_RENAME = 14, - SET_DIRECTORY_INFO = 15, - SNAPSHOT_LOOKUP = 9, - EXPIRE_TRANSIENT_FILE = 11, - VISIT_DIRECTORIES = 21, - VISIT_FILES = 32, - VISIT_TRANSIENT_FILES = 22, - FULL_READ_DIR = 33, - REMOVE_NON_OWNED_EDGE = 23, - SAME_SHARD_HARD_FILE_UNLINK = 24, - REMOVE_SPAN_INITIATE = 25, - REMOVE_SPAN_CERTIFY = 26, - SWAP_BLOCKS = 34, - BLOCK_SERVICE_FILES = 35, - REMOVE_INODE = 36, - CREATE_DIRECTORY_INODE = 128, - SET_DIRECTORY_OWNER = 129, - REMOVE_DIRECTORY_OWNER = 137, - CREATE_LOCKED_CURRENT_EDGE = 130, - LOCK_CURRENT_EDGE = 131, - UNLOCK_CURRENT_EDGE = 132, - REMOVE_OWNED_SNAPSHOT_FILE_EDGE = 134, - MAKE_FILE_TRANSIENT = 135, -}; - -std::ostream& operator<<(std::ostream& out, ShardMessageKind kind); - struct ShardReqContainer { private: ShardMessageKind _kind = (ShardMessageKind)0; @@ -2683,18 +2714,6 @@ public: std::ostream& operator<<(std::ostream& out, const ShardRespContainer& x); -enum class CDCMessageKind : uint8_t { - ERROR = 0, - MAKE_DIRECTORY = 1, - RENAME_FILE = 2, - SOFT_UNLINK_DIRECTORY = 3, - RENAME_DIRECTORY = 4, - HARD_UNLINK_DIRECTORY = 5, - CROSS_SHARD_HARD_UNLINK_FILE = 6, -}; - -std::ostream& operator<<(std::ostream& out, CDCMessageKind kind); - struct CDCReqContainer { private: CDCMessageKind _kind = (CDCMessageKind)0; @@ -2751,19 +2770,6 @@ public: std::ostream& operator<<(std::ostream& out, const CDCRespContainer& x); -enum class ShuckleMessageKind : uint8_t { - ERROR = 0, - BLOCK_SERVICES_FOR_SHARD = 1, - REGISTER_BLOCK_SERVICES = 2, - SHARDS = 3, - REGISTER_SHARD = 4, - ALL_BLOCK_SERVICES = 5, - REGISTER_CDC = 6, - CDC = 7, -}; - -std::ostream& operator<<(std::ostream& out, ShuckleMessageKind kind); - struct ShuckleReqContainer { private: ShuckleMessageKind _kind = (ShuckleMessageKind)0; diff --git a/cpp/core/Shuckle.cpp b/cpp/core/Shuckle.cpp index 0afa08a0..a70fd6d3 100644 --- a/cpp/core/Shuckle.cpp +++ b/cpp/core/Shuckle.cpp @@ -17,7 +17,6 @@ #include "Bincode.hpp" #include "Env.hpp" #include "Msgs.hpp" -#include "MsgsGen.hpp" #include "Shuckle.hpp" #include "Exception.hpp" @@ -214,7 +213,7 @@ std::string registerShard( return {}; } -std::string registerCDC(const std::string& host, uint16_t port, Duration timeout, const std::array& cdcAddr, uint16_t cdcPort) { +std::string registerCDC(const std::string& host, uint16_t port, Duration timeout, const std::array& cdcAddr, uint16_t cdcPort, const CDCStatus& status) { std::string errString; auto sock = shuckleSock(host, port, timeout, errString); if (sock.fd < 0) { @@ -225,6 +224,9 @@ std::string registerCDC(const std::string& host, uint16_t port, Duration timeout auto& req = reqContainer.setRegisterCdc(); req.ip.data = cdcAddr; req.port = cdcPort; + req.currentTransactionKind = status.executingTxnKind; + req.currentTransactionStep = status.executingTxnStep; + req.queuedTransactions = status.queuedTxns; errString = writeShuckleRequest(sock.fd, reqContainer); if (!errString.empty()) { return errString; diff --git a/cpp/core/Shuckle.hpp b/cpp/core/Shuckle.hpp index a7d53061..4259bda6 100644 --- a/cpp/core/Shuckle.hpp +++ b/cpp/core/Shuckle.hpp @@ -1,7 +1,6 @@ #pragma once #include "Msgs.hpp" -#include "MsgsGen.hpp" // The host here is the scheme + host + port, e.g. `http://localhost:5000`. // @@ -27,12 +26,20 @@ std::string registerShard( uint16_t shardPort ); +struct CDCStatus { + uint64_t queuedTxns; + // if non-zero, the following field is set too + CDCMessageKind executingTxnKind; + uint8_t executingTxnStep; +}; + std::string registerCDC( const std::string& shuckleHost, uint16_t shucklePort, Duration timeout, const std::array& cdcAddr, - uint16_t cdcPort + uint16_t cdcPort, + const CDCStatus& status ); std::string fetchShards( diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 99d09395..ea3d854f 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -13,7 +13,6 @@ #include "Crypto.hpp" #include "Exception.hpp" #include "Msgs.hpp" -#include "MsgsGen.hpp" #include "Shard.hpp" #include "Env.hpp" #include "ShardDB.hpp" diff --git a/cpp/shard/ShardDB.cpp b/cpp/shard/ShardDB.cpp index 6c2c9bfb..3b5485be 100644 --- a/cpp/shard/ShardDB.cpp +++ b/cpp/shard/ShardDB.cpp @@ -17,7 +17,6 @@ #include "Assert.hpp" #include "Exception.hpp" #include "Msgs.hpp" -#include "MsgsGen.hpp" #include "Time.hpp" #include "RocksDBUtils.hpp" #include "ShardDBData.hpp" @@ -2097,7 +2096,7 @@ struct ShardDBImpl { } EggsError _applyRemoveFileInode(EggsTime time, rocksdb::WriteBatch& batch, const RemoveInodeEntry& entry, RemoveInodeResp& resp) { - ALWAYS_ASSERT(entry.id.type() == InodeType::FILE || entry.id.type() == InodeType::DIRECTORY); + ALWAYS_ASSERT(entry.id.type() == InodeType::FILE || entry.id.type() == InodeType::SYMLINK); // we demand for the file to be transient, for the deadline to have passed, and for it to have // no spans diff --git a/cpp/shard/ShardDBData.hpp b/cpp/shard/ShardDBData.hpp index 82543855..2f9b135f 100644 --- a/cpp/shard/ShardDBData.hpp +++ b/cpp/shard/ShardDBData.hpp @@ -8,7 +8,6 @@ #include "Common.hpp" #include "Bincode.hpp" #include "Msgs.hpp" -#include "MsgsGen.hpp" #include "Time.hpp" #include "RocksDBUtils.hpp" diff --git a/go/bincodegen/bincodegen.go b/go/bincodegen/bincodegen.go index 728d350e..cd1fa12a 100644 --- a/go/bincodegen/bincodegen.go +++ b/go/bincodegen/bincodegen.go @@ -349,7 +349,7 @@ func generateC(errors []string, shardReqResps []reqRespType, cdcReqResps []reqRe } func cppType(t reflect.Type) string { - if t.Name() == "InodeId" || t.Name() == "InodeIdExtra" || t.Name() == "Parity" || t.Name() == "EggsTime" || t.Name() == "ShardId" { + if t.Name() == "InodeId" || t.Name() == "InodeIdExtra" || t.Name() == "Parity" || t.Name() == "EggsTime" || t.Name() == "ShardId" || t.Name() == "CDCMessageKind" { return t.Name() } switch t.Kind() { @@ -727,7 +727,6 @@ func generateCppLogEntries(hpp io.Writer, cpp io.Writer, what string, types []re } func generateCppReqResp(hpp io.Writer, cpp io.Writer, what string, reqResps []reqRespType) { - generateCppKind(hpp, cpp, what, reqResps) reqContainerTypes := make([]containerType, len(reqResps)) for i, reqResp := range reqResps { reqContainerTypes[i] = containerType{ @@ -783,6 +782,10 @@ func generateCpp(errors []string, shardReqResps []reqRespType, cdcReqResps []req fmt.Fprintf(cppOut, " return out;\n") fmt.Fprintf(cppOut, "}\n\n") + generateCppKind(hppOut, cppOut, "Shard", shardReqResps) + generateCppKind(hppOut, cppOut, "CDC", cdcReqResps) + generateCppKind(hppOut, cppOut, "Shuckle", shuckleReqResps) + for _, typ := range extras { generateCppSingle(hppOut, cppOut, typ) } diff --git a/go/eggs/client.go b/go/eggs/client.go index a173ae39..5ea784df 100644 --- a/go/eggs/client.go +++ b/go/eggs/client.go @@ -1,6 +1,7 @@ package eggs import ( + "bytes" "crypto/cipher" "fmt" "net" @@ -27,6 +28,71 @@ type ClientCounters struct { CDC [10]ReqCounters } +func (counters *ClientCounters) Log(log *Logger) { + formatCounters := func(c *ReqCounters) { + totalCount := uint64(0) + for i := 0; i < c.Timings.Buckets(); i++ { + _, count, _ := c.Timings.Bucket(i) + totalCount += count + } + log.Info(" count: %v", totalCount) + if totalCount == 0 { + log.Info(" attempts: %v", c.Attempts) + } else { + log.Info(" attempts: %v (%v)", c.Attempts, float64(c.Attempts)/float64(totalCount)) + } + log.Info(" total time: %v", time.Duration(c.Timings.TotalTime())) + log.Info(" avg time: %v", time.Duration(uint64(c.Timings.TotalTime())/totalCount)) + hist := bytes.NewBuffer([]byte{}) + first := true + countSoFar := uint64(0) + for i := 0; i < c.Timings.Buckets(); i++ { + lowerBound, count, upperBound := c.Timings.Bucket(i) + if count == 0 { + continue + } + countSoFar += count + if first { + fmt.Fprintf(hist, "%v < ", lowerBound) + } else { + fmt.Fprintf(hist, ", ") + } + first = false + fmt.Fprintf(hist, "%v (%0.2f%%) < %v", count, float64(countSoFar*100)/float64(totalCount), upperBound) + } + log.Info(" hist: %v", hist.String()) + } + var shardTime time.Duration + for i := 0; i < len(counters.Shard[:]); i++ { + shardTime += counters.Shard[i].Timings.TotalTime() + } + log.Info("Shard stats (total shard time %v):", shardTime) + for i := 0; i < len(counters.Shard[:]); i++ { + c := &counters.Shard[i] + if c.Attempts == 0 { + continue + } + kind := msgs.ShardMessageKind(i) + log.Info(" %v", kind) + formatCounters(c) + } + var cdcTime time.Duration + for i := 0; i < len(counters.CDC[:]); i++ { + cdcTime += counters.CDC[i].Timings.TotalTime() + } + log.Info("CDC stats (total CDC time %v):", cdcTime) + for i := 0; i < len(counters.CDC[:]); i++ { + c := &counters.CDC[i] + if c.Attempts == 0 { + continue + } + kind := msgs.CDCMessageKind(i) + log.Info(" %v", kind) + formatCounters(c) + } + +} + type blockServiceConn struct { mu sync.Mutex lastActive time.Time diff --git a/go/eggs/gc.go b/go/eggs/gc.go index 7e12d48b..4c4470e8 100644 --- a/go/eggs/gc.go +++ b/go/eggs/gc.go @@ -1,6 +1,7 @@ package eggs import ( + "crypto/cipher" "fmt" "xtx/eggsfs/msgs" ) @@ -113,22 +114,41 @@ func destructFilesInternal( // Collects dead transient files, and expunges them. Stops when // all files have been traversed. Useful for testing a single iteration. -func DestructFiles( - log *Logger, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId, blockService MockableBlockServices, +func destructFiles( + log *Logger, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId, blockServiceKeys map[msgs.BlockServiceId]cipher.Block, ) error { + log.Info("starting to destruct files in shard %v", shid) client, err := NewClient(log, shuckleAddress, &shid, counters, nil) if err != nil { return err } defer client.Close() stats := DestructionStats{} - if err := destructFilesInternal(log, client, shid, &stats, blockService); err != nil { + var mbs MockableBlockServices + if blockServiceKeys == nil { + mbs = client + } else { + mbs = &MockedBlockServices{Keys: blockServiceKeys} + } + if err := destructFilesInternal(log, client, shid, &stats, mbs); err != nil { return err } log.Info("stats after one destruct files iteration: %+v", stats) return nil } +func DestructFiles( + log *Logger, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId, +) error { + return destructFiles(log, shuckleAddress, counters, shid, nil) +} + +func DestructFilesMockedBlockServices( + log *Logger, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId, blockServiceKeys map[msgs.BlockServiceId]cipher.Block, +) error { + return destructFiles(log, shuckleAddress, counters, shid, blockServiceKeys) +} + func DestructFilesInAllShards( log *Logger, shuckleAddress string, @@ -325,6 +345,7 @@ func collectDirectoriesInternal(log *Logger, client *Client, stats *CollectStats } func CollectDirectories(log *Logger, shuckleAddress string, counters *ClientCounters, shid msgs.ShardId) error { + log.Info("starting to collect directories in shard %v", shid) client, err := NewClient(log, shuckleAddress, &shid, counters, nil) if err != nil { return err diff --git a/go/eggsfuse/eggsfuse.go b/go/eggsfuse/eggsfuse.go index ed25a2f2..00ebbfc1 100644 --- a/go/eggsfuse/eggsfuse.go +++ b/go/eggsfuse/eggsfuse.go @@ -11,7 +11,6 @@ import ( "runtime/pprof" "sync" "syscall" - "time" "xtx/eggsfs/eggs" "xtx/eggsfs/msgs" @@ -863,63 +862,7 @@ func main() { go func() { for { <-statsChan - formatCounters := func(c *eggs.ReqCounters) { - totalCount := uint64(0) - for i := 0; i < c.Timings.Buckets(); i++ { - _, count, _ := c.Timings.Bucket(i) - totalCount += count - } - log.Info(" count: %v", totalCount) - log.Info(" attempts: %v (%v)", c.Attempts, float64(c.Attempts)/float64(totalCount)) - log.Info(" total time: %v", time.Duration(c.Timings.TotalTime())) - log.Info(" avg time: %v", time.Duration(uint64(c.Timings.TotalTime())/totalCount)) - hist := bytes.NewBuffer([]byte{}) - first := true - countSoFar := uint64(0) - for i := 0; i < c.Timings.Buckets(); i++ { - lowerBound, count, upperBound := c.Timings.Bucket(i) - if count == 0 { - continue - } - countSoFar += count - if first { - fmt.Fprintf(hist, "%v < ", lowerBound) - } else { - fmt.Fprintf(hist, ", ") - } - first = false - fmt.Fprintf(hist, "%v (%0.2f%%) < %v", count, float64(countSoFar*100)/float64(totalCount), upperBound) - } - log.Info(" hist: %v", hist.String()) - } - var shardTime time.Duration - for i := 0; i < len(counters.Shard[:]); i++ { - shardTime += counters.Shard[i].Timings.TotalTime() - } - log.Info("Shard stats (total shard time %v):", shardTime) - for i := 0; i < len(counters.Shard[:]); i++ { - c := &counters.Shard[i] - if c.Attempts == 0 { - continue - } - kind := msgs.ShardMessageKind(i) - log.Info(" %v", kind) - formatCounters(c) - } - var cdcTime time.Duration - for i := 0; i < len(counters.CDC[:]); i++ { - cdcTime += counters.CDC[i].Timings.TotalTime() - } - log.Info("CDC stats (total CDC time %v):", cdcTime) - for i := 0; i < len(counters.CDC[:]); i++ { - c := &counters.CDC[i] - if c.Attempts == 0 { - continue - } - kind := msgs.CDCMessageKind(i) - log.Info(" %v", kind) - formatCounters(c) - } + counters.Log(log) } }() } diff --git a/go/eggsshuckle/base.html b/go/eggsshuckle/base.html index cabfcf98..7e41ab3e 100644 --- a/go/eggsshuckle/base.html +++ b/go/eggsshuckle/base.html @@ -22,7 +22,7 @@ Overview diff --git a/go/eggsshuckle/directory.html b/go/eggsshuckle/directory.html index 80b1acea..8872e860 100644 --- a/go/eggsshuckle/directory.html +++ b/go/eggsshuckle/directory.html @@ -6,9 +6,9 @@ Path - / + / {{range $segment := .PathSegments}} - {{$segment.Segment}} / + {{$segment.Segment}} / {{end}} @@ -24,7 +24,7 @@ {{if eq .Owner "0x0"}} NONE {{else}} - {{.Owner}} + {{.Owner}} {{end}} @@ -40,7 +40,7 @@ {{if not (eq .Info.InheritedFrom "")}} Inherited from - {{.Info.InheritedFrom}} + {{.Info.InheritedFrom}} {{end}} @@ -113,9 +113,9 @@ {{$edge.Name}} {{else}} {{if and $edge.Current $.Path}} - {{$edge.Name}} + {{$edge.Name}} {{else}} - {{$edge.Name}} + {{$edge.Name}} {{end}} {{end}} diff --git a/go/eggsshuckle/eggsshuckle.go b/go/eggsshuckle/eggsshuckle.go index 345aa18a..ef8be032 100644 --- a/go/eggsshuckle/eggsshuckle.go +++ b/go/eggsshuckle/eggsshuckle.go @@ -45,13 +45,20 @@ func parseTemplates(ts ...namedTemplate) (tmpl *template.Template) { return tmpl } +type cdcState struct { + ip [4]byte + port uint16 + lastSeen msgs.EggsTime + queuedTxns uint64 + currentTxnKind msgs.CDCMessageKind + currentTxnStep uint8 +} + type state struct { mutex sync.RWMutex blockServices map[msgs.BlockServiceId]msgs.BlockServiceInfo shards [256]msgs.ShardInfo - cdcIp [4]byte - cdcPort uint16 - cdcLastSeen msgs.EggsTime + cdc cdcState } func newState() *state { @@ -133,8 +140,8 @@ func handleCdcReq(log *eggs.Logger, s *state, w io.Writer, req *msgs.CdcReq) *ms defer s.mutex.RUnlock() resp := msgs.CdcResp{} - resp.Ip = s.cdcIp - resp.Port = s.cdcPort + resp.Ip = s.cdc.ip + resp.Port = s.cdc.port return &resp } @@ -143,9 +150,12 @@ func handleRegisterCdcReq(log *eggs.Logger, s *state, w io.Writer, req *msgs.Reg s.mutex.Lock() defer s.mutex.Unlock() - s.cdcIp = req.Ip - s.cdcPort = req.Port - s.cdcLastSeen = msgs.Now() + s.cdc.ip = req.Ip + s.cdc.port = req.Port + s.cdc.currentTxnKind = req.CurrentTransactionKind + s.cdc.currentTxnStep = req.CurrentTransactionStep + s.cdc.queuedTxns = req.QueuedTransactions + s.cdc.lastSeen = msgs.Now() return &msgs.RegisterCdcResp{} } @@ -314,16 +324,19 @@ type indexShard struct { } type indexData struct { - NumBlockServices int - NumFailureDomains int - TotalCapacity string - TotalUsed string - TotalUsedPercentage string - CDCAddr string - CDCLastSeen string - BlockServices []indexBlockService - ShardsAddrs []indexShard - Blocks uint64 + NumBlockServices int + NumFailureDomains int + TotalCapacity string + TotalUsed string + TotalUsedPercentage string + CDCAddr string + CDCLastSeen string + CDCCurrentTransactionKind string + CDCCurrentTransactionStep string + CDCQueuedTransactions uint64 + BlockServices []indexBlockService + ShardsAddrs []indexShard + Blocks uint64 } //go:embed index.html @@ -410,8 +423,13 @@ func handleIndex(ll *eggs.Logger, state *state, w http.ResponseWriter, r *http.R formatLastSeen := func(t msgs.EggsTime) string { return formatNanos(uint64(now) - uint64(t)) } - data.CDCAddr = fmt.Sprintf("%v:%v", net.IP(state.cdcIp[:]), state.cdcPort) - data.CDCLastSeen = formatLastSeen(state.cdcLastSeen) + data.CDCAddr = fmt.Sprintf("%v:%v", net.IP(state.cdc.ip[:]), state.cdc.port) + data.CDCLastSeen = formatLastSeen(state.cdc.lastSeen) + data.CDCQueuedTransactions = state.cdc.queuedTxns + if state.cdc.currentTxnKind != 0 { + data.CDCCurrentTransactionKind = state.cdc.currentTxnKind.String() + data.CDCCurrentTransactionStep = fmt.Sprintf("%v", state.cdc.currentTxnStep) + } totalCapacityBytes := uint64(0) totalAvailableBytes := uint64(0) failureDomainsBytes := make(map[string]struct{}) @@ -553,7 +571,7 @@ func newClient(log *eggs.Logger, state *state) *eggs.Client { shardIps[i] = si.Ip shardPorts[i] = si.Port } - client, err := eggs.NewClientDirect(log, nil, nil, nil, state.cdcIp, state.cdcPort, &shardIps, &shardPorts) + client, err := eggs.NewClientDirect(log, nil, nil, nil, state.cdc.ip, state.cdc.port, &shardIps, &shardPorts) if err != nil { panic(err) } @@ -650,7 +668,7 @@ func handleInode( handlePage( log, w, r, func(query url.Values) (*template.Template, *pageData, int) { - path := r.URL.Path[len("/inode"):] + path := r.URL.Path[len("/browse"):] path = normalizePath(path) var id msgs.InodeId idStr := query.Get("id") @@ -917,6 +935,7 @@ func setupRouting(log *eggs.Logger, st *state) { "/shuckle-face.png", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "image/png") + w.Header().Set("Cache-Control", "max-age=300") w.Write(shuckleFacePngStr) }, ) @@ -942,7 +961,7 @@ func setupRouting(log *eggs.Logger, st *state) { namedTemplate{name: "base", body: baseTemplateStr}, namedTemplate{name: "directory", body: directoryTemplateStr}, ) - setupPage("/inode/", handleInode) + setupPage("/browse/", handleInode) } func main() { diff --git a/go/eggsshuckle/file.html b/go/eggsshuckle/file.html index b1c247b5..c2ce626b 100644 --- a/go/eggsshuckle/file.html +++ b/go/eggsshuckle/file.html @@ -6,12 +6,12 @@ Path - / + / {{range $segment := .PathSegments}} {{if eq $segment.PathSoFar $.Path}} - {{$segment.Segment}} + {{$segment.Segment}} {{else}} - {{$segment.Segment}} / + {{$segment.Segment}} / {{end}} {{end}} diff --git a/go/eggsshuckle/index.html b/go/eggsshuckle/index.html index 47eb56d5..e09c7d48 100644 --- a/go/eggsshuckle/index.html +++ b/go/eggsshuckle/index.html @@ -12,11 +12,15 @@ Address LastSeen + QueuedTransactions + CurrentTransaction {{.CDCAddr}} {{.CDCLastSeen}} + {{.CDCQueuedTransactions}} + {{.CDCCurrentTransactionKind}} {{.CDCCurrentTransactionStep}} diff --git a/go/gcdaemon/gcdaemon.go b/go/gcdaemon/gcdaemon.go deleted file mode 100644 index e7945cfb..00000000 --- a/go/gcdaemon/gcdaemon.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "os" - "strconv" - "sync" - "xtx/eggsfs/eggs" - "xtx/eggsfs/managedroutine" - "xtx/eggsfs/msgs" -) - -func noRunawayArgs() { - if flag.NArg() > 0 { - fmt.Fprintf(os.Stderr, "Unexpected extra arguments %v\n", flag.Args()) - os.Exit(2) - } -} - -func main() { - verbose := flag.Bool("verbose", false, "Enables debug logging.") - singleIteration := flag.Bool("single-iteration", false, "Whether to run a single iteration of GC and terminate.") - logFile := flag.String("log-file", "", "File to log to, stdout if not provided.") - shuckleAddress := flag.String("shuckle", eggs.DEFAULT_SHUCKLE_ADDRESS, "Shuckle address (host:port).") - flag.Parse() - - if flag.NArg() < 1 { - fmt.Fprintf(os.Stderr, "Please specify at least one shard to run with using positional arguments.") - os.Exit(2) - } - - shards := []msgs.ShardId{} - for _, shardStr := range flag.Args() { - shardI, err := strconv.Atoi(shardStr) - if err != nil { - fmt.Fprintf(os.Stderr, "Invalid shard %v: %v", shardStr, err) - os.Exit(2) - } - if shardI < 0 || shardI > 255 { - fmt.Fprintf(os.Stderr, "Invalid shard %v", shardStr) - os.Exit(2) - } - shards = append(shards, msgs.ShardId(shardI)) - } - - logOut := os.Stdout - if *logFile != "" { - var err error - logOut, err = os.OpenFile(*logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Fprintf(os.Stderr, "could not open file %v: %v", *logFile, err) - os.Exit(1) - } - defer logOut.Close() - } - log := eggs.NewLogger(*verbose, logOut) - - panicChan := make(chan error) - finishedChan := make(chan struct{}) - var wait sync.WaitGroup - wait.Add(len(shards)) - - managedRoutines := managedroutine.New(panicChan) - managedRoutines.Start( - "waiter", - func() { - wait.Wait() - finishedChan <- struct{}{} - }, - ) - for _, shard := range shards { - managedRoutines.Start( - fmt.Sprintf("shard%v", shard), - func() { - for { - eggs.CollectDirectories(log, *shuckleAddress, nil, shard) - if *singleIteration { - break - } - } - wait.Done() - }, - ) - } - - select { - case err := <-panicChan: - panic(err) - case <-finishedChan: - log.Info("finished collecting on all shards, terminating") - } -} diff --git a/go/msgs/msgs.go b/go/msgs/msgs.go index 5a6a52ee..e4406a27 100644 --- a/go/msgs/msgs.go +++ b/go/msgs/msgs.go @@ -1302,8 +1302,11 @@ type RegisterShardReq struct { type RegisterShardResp struct{} type RegisterCdcReq struct { - Ip [4]byte - Port uint16 + Ip [4]byte + Port uint16 + CurrentTransactionKind CDCMessageKind // if 0, nothing is executing + CurrentTransactionStep uint8 + QueuedTransactions uint64 } type RegisterCdcResp struct{} diff --git a/go/msgs/msgs_bincode.go b/go/msgs/msgs_bincode.go index 526d313b..fd2fc8fd 100644 --- a/go/msgs/msgs_bincode.go +++ b/go/msgs/msgs_bincode.go @@ -3433,6 +3433,15 @@ func (v *RegisterCdcReq) Pack(w io.Writer) error { if err := bincode.PackScalar(w, uint16(v.Port)); err != nil { return err } + if err := bincode.PackScalar(w, uint8(v.CurrentTransactionKind)); err != nil { + return err + } + if err := bincode.PackScalar(w, uint8(v.CurrentTransactionStep)); err != nil { + return err + } + if err := bincode.PackScalar(w, uint64(v.QueuedTransactions)); err != nil { + return err + } return nil } @@ -3443,6 +3452,15 @@ func (v *RegisterCdcReq) Unpack(r io.Reader) error { if err := bincode.UnpackScalar(r, (*uint16)(&v.Port)); err != nil { return err } + if err := bincode.UnpackScalar(r, (*uint8)(&v.CurrentTransactionKind)); err != nil { + return err + } + if err := bincode.UnpackScalar(r, (*uint8)(&v.CurrentTransactionStep)); err != nil { + return err + } + if err := bincode.UnpackScalar(r, (*uint64)(&v.QueuedTransactions)); err != nil { + return err + } return nil }