// Copyright 2025 XTX Markets Technologies Limited // // SPDX-License-Identifier: GPL-2.0-or-later #pragma once #include "LogsDB.hpp" #include "MultiplexedChannel.hpp" #include "Random.hpp" #include "RegistryCommon.hpp" #include "RegistryKey.hpp" static constexpr std::array RegistryProtocols = { LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION, }; using RegistryChannel = MultiplexedChannel; constexpr int MAX_RECV_MSGS = 500; struct RegistryRequest { uint64_t requestId; RegistryReqContainer req; }; struct RegistryResponse { uint64_t requestId; RegistryRespContainer resp; }; class RegistryServer { public: RegistryServer(const RegistryOptions &options, Env& env) : _options(options.serverOptions), _maxConnections(options.maxConnections), _env(env), _socks({UDPSocketPair(_env, _options.addrs)}), _boundAddresses(_socks[0].addr()), _lastRequestId(0), _sender(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}), _packetDropRand(ternNow().ns), _outgoingPacketDropProbability(_options.simulateOutgoingPacketDrop) { _receiver = std::make_unique>(UDPReceiverConfig{ .perSockMaxRecvMsg = MAX_RECV_MSGS, .maxMsgSize = MAX_UDP_MTU}); expandKey(RegistryKey, _expandedRegistryKey); } virtual ~RegistryServer() = default; bool init(); inline const AddrsInfo& boundAddresses() const { return _boundAddresses; } inline void setReplicas(std::shared_ptr> replicaInfo) { _replicaInfo = replicaInfo; } bool receiveMessages(Duration timeout); inline std::vector& receivedLogsDBRequests() { return _logsDBRequests; } inline std::vector& receivedLogsDBResponses() { return _logsDBResponses; } inline std::vector& receivedRegistryRequests() { return _receivedRequests; } void sendLogsDBMessages(std::vector& requests, std::vector& responses); void sendRegistryResponses(std::vector& responses); private: const ServerOptions _options; uint32_t _maxConnections; Env& _env; std::array _socks; // in an array to play with UDPReceiver<> const AddrsInfo _boundAddresses; std::unique_ptr> _receiver; RegistryChannel _channel; AES128Key _expandedRegistryKey; std::array _sockFds{-1, -1}; int _epollFd = -1; static constexpr int MAX_EVENTS = 1024; std::array _events; struct Client { int fd; std::string readBuffer; std::string writeBuffer; TernTime lastActive; size_t messageBytesProcessed; uint64_t inFlightRequestId; }; std::unordered_map _clients; uint64_t _lastRequestId; std::unordered_map _inFlightRequests; // request to fd mapping std::vector _receivedRequests; std::shared_ptr> _replicaInfo; // log entries buffers std::vector _logsDBRequests; // requests from other replicas std::vector _logsDBResponses; // responses from other replicas // outgoing network UDPSender _sender; RandomGenerator _packetDropRand; uint64_t _outgoingPacketDropProbability; // probability * 10,000 void _acceptConnection(int fd); void _removeClient(int fd); void _readClient(int fd); void _writeClient(int fd, bool registerEpoll = false); uint8_t _getReplicaId(const IpPort &clientAddress); AddrsInfo* _addressFromReplicaId(ReplicaId id); void _handleLogsDBResponse(UDPMessage &msg); void _handleLogsDBRequest(UDPMessage &msg); void _sendResponse(int fd, RegistryRespContainer &resp); void _packLogsDBResponse(LogsDBResponse &response); void _packLogsDBRequest(LogsDBRequest &request); };