diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index 9f9d325d..a801ea5a 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -43,7 +43,6 @@ struct CDCShared { LogsDB& logsDB; std::array socks; std::atomic isLeader; - std::shared_ptr> replicas; std::mutex shardsMutex; std::array shards; // How long it took us to process the entire request, from parse to response. @@ -58,6 +57,14 @@ struct CDCShared { timingsTotal[(int)kind] = Timings::Standard(); } } + inline std::shared_ptr> getReplicas() { + return std::atomic_load(&_replicas); + } + inline void setReplicas(const std::array&& newReplicas) { + std::atomic_store(&_replicas, std::make_shared>(newReplicas)); + } +private: + std::shared_ptr> _replicas; }; struct InFlightShardRequest { @@ -265,7 +272,7 @@ private: std::vector _logsDBOutResponses; std::unordered_map _inFlightLogEntries; std::unordered_map> _logEntryIdxToReqInfos; - std::shared_ptr> _replicas; + std::shared_ptr> _replicas; public: CDCServer(Logger& logger, std::shared_ptr& xmon, CDCOptions& options, CDCShared& shared) : Loop(logger, xmon, "req_server"), @@ -303,7 +310,7 @@ public: _shardRespReqIds.clear(); _receivedResponses.clear(); _entryIdxToRespIds.clear(); - _replicas = _shared.replicas; + _replicas = _shared.getReplicas(); // Timeout ShardRequests @@ -580,7 +587,7 @@ private: return _cdcReqs.size() + _shardResps.size(); } - AddrsInfo* addressFromReplicaId(ReplicaId id) { + const AddrsInfo* addressFromReplicaId(ReplicaId id) { if (!_replicas) { return nullptr; } @@ -936,7 +943,8 @@ public: _env.updateAlert(_alert, "AddrsInfo in registry: %s , not matching local AddrsInfo: %s", replicas[_replicaId.u8], _shared.socks[CDC_SOCK].addr()); return false; } - if (unlikely(!_shared.replicas)) { + auto oldReplicas = _shared.getReplicas(); + if (unlikely(!oldReplicas)) { size_t emptyReplicas{0}; for (auto& replica : replicas) { if (replica.addrs[0].port == 0) { @@ -948,9 +956,9 @@ public: return false; } } - if (unlikely(!_shared.replicas || *_shared.replicas != replicas)) { + if (unlikely(!oldReplicas || *oldReplicas != replicas)) { LOG_DEBUG(_env, "Updating replicas to %s %s %s %s %s", replicas[0], replicas[1], replicas[2], replicas[3], replicas[4]); - std::atomic_exchange(&_shared.replicas, std::make_shared>(replicas)); + _shared.setReplicas(std::move(replicas)); } } return true; diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 15e643a8..3dd93b57 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -101,10 +101,6 @@ struct ShardShared { ShardDB& shardDB; BlockServicesCacheDB& blockServicesCache; - // replication information - std::shared_ptr> replicas; // used for replicating shard within location - std::shared_ptr> leadersAtOtherLocations; // used for cross location replication - // statistics std::array timings; std::array errors; @@ -146,8 +142,6 @@ struct ShardShared { logsDB(logsDB_), shardDB(shardDB_), blockServicesCache(blockServicesCache_), - replicas(nullptr), - leadersAtOtherLocations(std::make_shared>()), receivedLogsDBReqs(0), receivedLogsDBResps(0), receivedProxyLogsDBReqs(0), @@ -163,7 +157,9 @@ struct ShardShared { droppedProxyWriteResps(0), droppedReadReqs(0), isInitiated(false), - isBlockServiceCacheInitiated(false) + isBlockServiceCacheInitiated(false), + _replicas(nullptr), + _leadersAtOtherLocations(std::make_shared>()) { for (uint16_t i = 0; i < options_.numReaders; ++i) { readerRequestsQueues.emplace_back(std::make_unique>(READER_QUEUE_SIZE)); @@ -179,6 +175,27 @@ struct ShardShared { const UDPSocketPair& sock() const { return socks[0]; } + + inline std::shared_ptr> getReplicas() const { + return std::atomic_load(&_replicas); + } + + inline void setReplicas(const std::array&& replicas) { + std::atomic_store(&_replicas, std::make_shared>(replicas)); + } + + inline std::shared_ptr> getLeadersAtOtherLocations() const { + return std::atomic_load(&_leadersAtOtherLocations); + } + + inline void setLeadersAtOtherLocations(const std::vector&& leadersAtOtherLocations) { + std::atomic_store(&_leadersAtOtherLocations, std::make_shared>(leadersAtOtherLocations)); + } + +private: + // replication information + std::shared_ptr> _replicas; // used for replicating shard within location + std::shared_ptr> _leadersAtOtherLocations; // used for cross location replication }; static bool bigRequest(ShardMessageKind kind) { @@ -359,7 +376,7 @@ private: proxyLocation = false; } else { // try matching to other locations - auto leadersPtr = _shared.leadersAtOtherLocations; + auto leadersPtr = _shared.getLeadersAtOtherLocations(); for (auto& leader : *leadersPtr) { if (leader.locationId == 0 && leader.addrs.contains(msg.clientAddr)) { auto& resp = _proxyLogsDBResponses.emplace_back(); @@ -402,7 +419,7 @@ private: proxyLocation = false; } else { // try matching to other locations - auto leadersPtr = _shared.leadersAtOtherLocations; + auto leadersPtr = _shared.getLeadersAtOtherLocations(); for (auto& leader : *leadersPtr) { if (leader.addrs.contains(msg.clientAddr)) { auto& req = _proxyLogsDBRequests.emplace_back(); @@ -434,7 +451,7 @@ private: } uint8_t _getReplicaId(const IpPort& clientAddress) { - auto replicasPtr = _shared.replicas; + auto replicasPtr = _shared.getReplicas(); if (!replicasPtr) { return LogsDB::REPLICA_COUNT; } @@ -776,7 +793,7 @@ private: uint64_t _requestIdCounter; std::unordered_map _logIdToShardRequest; // used to track which log entry was generated by which request - std::shared_ptr> _replicaInfo; + std::shared_ptr> _replicaInfo; virtual void sendStop() override { _shared.logsDBRequestQueue.close(); @@ -825,8 +842,8 @@ public: if (!_shared.options.isProxyLocation()) { return; } - auto leadersAtOtherLocations = _shared.leadersAtOtherLocations; - AddrsInfo* primaryLeaderAddress = nullptr; + auto leadersAtOtherLocations = _shared.getLeadersAtOtherLocations(); + const AddrsInfo* primaryLeaderAddress = nullptr; for(auto& leader : *leadersAtOtherLocations ) { if (leader.locationId == 0) { primaryLeaderAddress = &leader.addrs; @@ -918,7 +935,7 @@ public: } auto now = ternNow(); - auto leadersAtOtherLocations = _shared.leadersAtOtherLocations; + auto leadersAtOtherLocations = _shared.getLeadersAtOtherLocations(); for (auto it = _crossRegionWaitResponses.begin(); it != _crossRegionWaitResponses.end(); ) { auto& waitInfo = it->second; @@ -1001,7 +1018,7 @@ public: if (!_isLogsDBLeader || _shared.options.isProxyLocation()) { return; } - auto leadersAtOtherLocations = _shared.leadersAtOtherLocations; + auto leadersAtOtherLocations = _shared.getLeadersAtOtherLocations(); if (leadersAtOtherLocations->empty()) { return; } @@ -1143,7 +1160,7 @@ public: // For CDC requests, wait for all secondary leaders to apply // before sending response auto now = ternNow(); - auto leadersAtOtherLocations = _shared.leadersAtOtherLocations; + auto leadersAtOtherLocations = _shared.getLeadersAtOtherLocations(); if ((!leadersAtOtherLocations->empty()) && resp.body.resp.kind() == ShardMessageKind::CREATE_DIRECTORY_INODE) { CrossRegionWaitInfo waitInfo; @@ -1545,7 +1562,7 @@ public: auto waitIt = _crossRegionWaitResponses.find(responseId); if (waitIt != _crossRegionWaitResponses.end()) { // Match source address to location ID - auto leadersAtOtherLocations = _shared.leadersAtOtherLocations; + auto leadersAtOtherLocations = _shared.getLeadersAtOtherLocations(); uint8_t respondingLocationId = 255; // Invalid location ID for (const auto& leader : *leadersAtOtherLocations) { @@ -1786,7 +1803,7 @@ public: _sender.sendMessages(_env, _shared.sock()); } - AddrsInfo* addressFromReplicaId(ReplicaId id) { + const AddrsInfo* addressFromReplicaId(ReplicaId id) { if (!_replicaInfo) { return nullptr; } @@ -1862,7 +1879,7 @@ public: _proxyReadRequests.clear(); _proxyReadRequestsIndices.clear(); - _replicaInfo = _shared.replicas; + _replicaInfo = _shared.getReplicas(); auto hasWork = _shared.writeQueuesWaiter.wait(_nextTimeout); auto remainingPullBudget = _maxWorkItemsAtOnce; if (unlikely(!hasWork && _shared.logsDBRequestQueue.isClosed())) { @@ -2045,7 +2062,8 @@ public: _env.updateAlert(_alert, "AddrsInfo in registry: %s , not matching local AddrsInfo: %s", localReplicas[_shrid.replicaId().u8], _shared.sock().addr()); return false; } - if (unlikely(!_shared.replicas)) { + auto oldReplicas = _shared.getReplicas(); + if (unlikely(!oldReplicas)) { size_t emptyReplicas{0}; for (auto& replica : localReplicas) { if (replica.addrs[0].port == 0) { @@ -2057,13 +2075,13 @@ public: return false; } } - if (unlikely(!_shared.replicas || *_shared.replicas != localReplicas)) { + if (unlikely(!oldReplicas || *oldReplicas != localReplicas)) { LOG_DEBUG(_env, "Updating replicas to %s %s %s %s %s", localReplicas[0], localReplicas[1], localReplicas[2], localReplicas[3], localReplicas[4]); - std::atomic_exchange(&_shared.replicas, std::make_shared>(localReplicas)); + _shared.setReplicas(std::move(localReplicas)); } LOG_DEBUG(_env, "Updating leaders at other locations to %s", leadersAtOtherLocations); - std::atomic_exchange(&_shared.leadersAtOtherLocations, std::make_shared>(std::move(leadersAtOtherLocations))); + _shared.setLeadersAtOtherLocations(std::move(leadersAtOtherLocations)); } _shared.isInitiated.store(true, std::memory_order_release); _env.clearAlert(_alert);